diff --git a/deploy/samples/logicaldb.yaml b/deploy/samples/logicaldb.yaml index 87aa3b65..0690d0a5 100644 --- a/deploy/samples/logicaldb.yaml +++ b/deploy/samples/logicaldb.yaml @@ -21,5 +21,5 @@ metadata: name: logical-offline spec: url: jdbc:logical://nearline=ads-database;offline=ads-catalog-database - schema: LOGICAL_OFFLINE + schema: LOGICAL-OFFLINE dialect: Calcite diff --git a/docs/extending/index.md b/docs/extending/index.md index 526e3dea..322c5047 100644 --- a/docs/extending/index.md +++ b/docs/extending/index.md @@ -6,17 +6,19 @@ both — pick the layer that matches what you're doing. ## Pick the right surface -| You want to… | What you'll write | -| --------------------------------------------------------------------------------------------- | ------------------------------------------------------------------ | -| Connect a new external system to the catalog (Kafka, Venice, MySQL, your-system). | A JDBC adapter + `TableTemplate` / `JobTemplate`. See [Data sources](data-sources.md). | -| Send Hoptimator-generated specs somewhere other than Kubernetes. | A `Deployer` + `DeployerProvider`. See [Deployers](deployers.md). | -| Reject SQL or YAML that's invalid in your environment before it deploys. | A `Validator` + `ValidatorProvider`. See [Validators](validators.md). | -| Pull configuration values from somewhere other than `hoptimator-configmap`. | A `ConfigProvider`. See [Config providers](config-providers.md). | -| Customize what gets deployed for an existing system. | Just a `TableTemplate` or `JobTemplate` — no Java needed. See [Templates and configuration](../kubernetes/templates.md). | +| You want to… | What you'll write | +|-----------------------------------------------------------------------------------------------------| ------------------------------------------------------------------ | +| Connect a new external system to the catalog (Kafka, Venice, MySQL, your-system). | A JDBC adapter + `TableTemplate` / `JobTemplate`. See [Data sources](data-sources.md). | +| Send Hoptimator-generated specs somewhere other than Kubernetes. | A `Deployer` + `DeployerProvider`. See [Deployers](deployers.md). | +| Reject SQL or YAML that's invalid in your environment before it deploys. | A `Validator` + `ValidatorProvider`. See [Validators](validators.md). | +| Pull configuration values from somewhere other than `hoptimator-configmap`. | A `ConfigProvider`. See [Config providers](config-providers.md). | +| Build a dependency graph from some backing store (e.g. K8s). | A `GraphProvider`. The K8s-backed default ships in `hoptimator-k8s`. | +| Render the dependency graph in a format other than Mermaid (DOT, JSON, an interactive web view, …). | A `GraphRenderer`. The Mermaid default ships in `hoptimator-graph`. | +| Customize what gets deployed for an existing system. | Just a `TableTemplate` or `JobTemplate` — no Java needed. See [Templates and configuration](../kubernetes/templates.md). | ## How extensions are loaded -All four extension points are loaded via Java's `ServiceLoader`. To register +All extension points are loaded via Java's `ServiceLoader`. To register an implementation, drop a service file under `src/main/resources/META-INF/services/` named after the SPI interface: @@ -26,6 +28,8 @@ META-INF/services/com.linkedin.hoptimator.ValidatorProvider META-INF/services/com.linkedin.hoptimator.ConfigProvider META-INF/services/com.linkedin.hoptimator.ConnectorProvider META-INF/services/com.linkedin.hoptimator.CatalogProvider +META-INF/services/com.linkedin.hoptimator.graph.GraphProvider +META-INF/services/com.linkedin.hoptimator.graph.GraphRenderer ``` Each file contains the fully qualified class name(s) of your @@ -66,6 +70,22 @@ mutation, and the SQL/YAML is rejected if a validator returns errors. Common uses: naming conventions, schema compatibility, ACL checks. See [Validators](validators.md). +### "I want to visualize what's deployed differently" + +The `!graph` CLI command (see +[SQL CLI → !graph](../user-guide/sql-cli.md#graph-identifier---depth-n)) +goes through two SPIs: `GraphProvider` builds the typed +`PipelineGraph` from some backing store, and `GraphRenderer` serializes +it to a string. The bundled defaults are a K8s-backed +`K8sGraphProvider` (in `hoptimator-k8s`) and a Mermaid `MermaidRenderer` +(in `hoptimator-graph`). + +Add a `GraphRenderer` to support a new output format (e.g. DOT for +graphviz, a JSON shape for a web UI). Add a `GraphProvider` if the +pipeline state lives somewhere other than Kubernetes — the K8s +implementation is the reference. Both register via `META-INF/services` +like every other SPI here. + ## Register, then test After dropping a service file and a class, the standard verification path diff --git a/docs/getting-started/architecture.md b/docs/getting-started/architecture.md index 5a4fd65a..3efed3df 100644 --- a/docs/getting-started/architecture.md +++ b/docs/getting-started/architecture.md @@ -142,24 +142,25 @@ the pipeline. The repo is split into focused modules. The ones you'll touch most often: -| Module | Role | -| --------------------------------- | -------------------------------------------------------------------- | -| `hoptimator-api` | The interfaces. `Deployer`, `Engine`, `Connector`, `View`, etc. | -| `hoptimator-jdbc` | Calcite-based JDBC driver. Catalog, parser, planner integration. | -| `hoptimator-jdbc-driver` | Lightweight wrapper that exposes the driver to standard JDBC code. | -| `hoptimator-cli` | The `./hoptimator` SQL CLI (sqlline + custom commands). | -| `hoptimator-mcp-server` | MCP server that wraps the JDBC driver for AI agents and IDEs. | -| `hoptimator-util` | Planner rules, deployment service, template engine. | -| `hoptimator-k8s` | Default Deployers, the catalog/operator glue, all CRDs. | -| `hoptimator-operator` | The reconciler loop and its main entry point. | -| `hoptimator-flink-runner` | The runtime that executes Flink SQL jobs produced by the planner. | -| `hoptimator-flink-adapter` | Flink-side adapter for catalog awareness. | -| `hoptimator-kafka` / `-kafka-controller` | Kafka catalog and controller integration. | -| `hoptimator-venice` | Venice catalog adapter. | -| `hoptimator-mysql` | MySQL catalog adapter. | -| `hoptimator-logical` | LogicalTable support — one logical entity, multiple physical tiers. | -| `hoptimator-demodb` | In-memory demo source used by the quickstart. | -| `hoptimator-avro` | Avro schema utilities used by the catalog/connectors. | +| Module | Role | +|------------------------------------------|---------------------------------------------------------------------| +| `hoptimator-api` | The interfaces. `Deployer`, `Engine`, `Connector`, `View`, etc. | +| `hoptimator-jdbc` | Calcite-based JDBC driver. Catalog, parser, planner integration. | +| `hoptimator-jdbc-driver` | Lightweight wrapper that exposes the driver to standard JDBC code. | +| `hoptimator-cli` | The `./hoptimator` SQL CLI (sqlline + custom commands). | +| `hoptimator-mcp-server` | MCP server that wraps the JDBC driver for AI agents and IDEs. | +| `hoptimator-util` | Planner rules, deployment service, template engine. | +| `hoptimator-k8s` | Default Deployers, the catalog/operator glue, all CRDs. | +| `hoptimator-operator` | The reconciler loop and its main entry point. | +| `hoptimator-flink-runner` | The runtime that executes Flink SQL jobs produced by the planner. | +| `hoptimator-flink-adapter` | Flink-side adapter for catalog awareness. | +| `hoptimator-kafka` / `-kafka-controller` | Kafka catalog and controller integration. | +| `hoptimator-venice` | Venice catalog adapter. | +| `hoptimator-mysql` | MySQL catalog adapter. | +| `hoptimator-logical` | LogicalTable support — one logical entity, multiple physical tiers. | +| `hoptimator-graph` | Pipeline graph renderers. Ships the Mermaid backend for `!graph`. | +| `hoptimator-demodb` | In-memory demo source used by the quickstart. | +| `hoptimator-avro` | Avro schema utilities used by the catalog/connectors. | A handful of modules (`hoptimator-catalog`, `hoptimator-models`, `hoptimator-planner`) are in the tree but marked for deletion; new @@ -179,6 +180,11 @@ contributions should not target them. `DeployerProvider`. Kubernetes is the default but not a hard requirement; the bundled `K8sSourceDeployer` and `K8sJobDeployer` are themselves examples. Anything that knows how to materialize a spec will do. +- **A new visualization format or graph backend**: implement `GraphRenderer` + for a new output format (DOT, JSON, an interactive HTML view, …) or + `GraphProvider` to build the dependency graph from somewhere other than + Kubernetes. The bundled Mermaid renderer (`hoptimator-graph`) and K8s + provider (`hoptimator-k8s`) are reference impls. - **Different cluster configuration**: usually a `ConfigProvider` change rather than code. diff --git a/docs/getting-started/concepts.md b/docs/getting-started/concepts.md index c5dcca8b..4b2e6824 100644 --- a/docs/getting-started/concepts.md +++ b/docs/getting-started/concepts.md @@ -99,7 +99,7 @@ right bootstrap servers and topic name. Connectors are produced by the catalog adapter for each database, embedded in the YAML that [TableTemplates](#tabletemplates-and-jobtemplates) and [JobTemplates](#tabletemplates-and-jobtemplates) emit, and can be customized -via [hints](#hints). +via [hints](#configuration-and-hints). Connectors do not require an `Engine` to function. The typical flow is: Hoptimator generates a `FlinkSessionJob` (or similar) with the connector diff --git a/docs/index.md b/docs/index.md index a0cd92a6..c19b9739 100644 --- a/docs/index.md +++ b/docs/index.md @@ -23,7 +23,8 @@ Start with **[Getting started](getting-started/index.md)**: See the **[User guide](user-guide/index.md)**: - [SQL CLI](user-guide/sql-cli.md) — sqlline-based interactive shell with - `!pipeline`, `!specify`, `!resolve` for inspecting plans before they deploy. + `!pipeline`, `!specify`, `!resolve` for inspecting plans before they + deploy, and `!graph` for visualizing what's already running. - [JDBC driver](user-guide/jdbc.md) — `jdbc:hoptimator://` for Java apps, with full connection-property reference. - [MCP server](user-guide/mcp-server.md) — Model Context Protocol server @@ -64,6 +65,8 @@ See **[Extending Hoptimator](extending/index.md)**: via `Validator` and `ValidatorProvider`. - [Config providers](extending/config-providers.md) — custom `ConfigProvider` SPI. +- [Pipeline graph SPIs](extending/index.md) — `GraphProvider` (alternate backing store) and + `GraphRenderer` (alternate output format). - [Templates and configuration](kubernetes/templates.md) — authoring `TableTemplate` and `JobTemplate` (lives in the Kubernetes guide). diff --git a/docs/user-guide/sql-cli.md b/docs/user-guide/sql-cli.md index 9f63320e..1eb39246 100644 --- a/docs/user-guide/sql-cli.md +++ b/docs/user-guide/sql-cli.md @@ -33,8 +33,8 @@ See [JDBC driver](jdbc.md) for the full URL syntax. ## Built-in commands Standard sqlline commands all work (`!help`, `!quit`, `!run`, `!record`, -…) along with the catalog-introspection ones below. Hoptimator adds the -last four for inspecting plans and pipelines. +…) along with the catalog-introspection ones below. Hoptimator also adds +commands to inspect plans, pipelines, and the deployed graph. | Command | What it does | | ------------- | ----------------------------------------------------------------------------- | @@ -44,9 +44,11 @@ last four for inspecting plans and pipelines. | `!resolve` | Print the schema and source/sink connector configs Hoptimator would use for a table. | | `!pipeline` | Print the auto-generated pipeline SQL for a SELECT or CREATE MATERIALIZED VIEW statement. | | `!specify` | Print every Kubernetes spec the statement would deploy. The dry-run for `CREATE MATERIALIZED VIEW`. | +| `!graph` | Render the deployed dependency graph rooted at an identifier as a Mermaid diagram. | -`!resolve`, `!pipeline`, and `!specify` do not modify any state. Use them to -sanity-check a plan before you let the JDBC driver actually deploy it. +`!resolve`, `!pipeline`, `!specify`, and `!graph` do not modify any state. +Use them to sanity-check a plan before you let the JDBC driver actually +deploy it (and to inspect what's already running). ### `!resolve ` @@ -115,6 +117,61 @@ If you'd `kubectl apply` the output, you'd get the same result as actually running the `CREATE MATERIALIZED VIEW`. This is the safest way to review what a statement will do before you run it. +### `!graph [--depth N]` + +```sql +0: Hoptimator> !graph ADS.AUDIENCE +flowchart LR + subgraph n0["Materialized View"] + n1[/"ads-audience +kind: SqlJob +engine: Flink +mode: Streaming"/] +end + n2[("ADS.PAGE_VIEWS")] + n3[("PROFILE.MEMBERS")] + n4[("ADS.AUDIENCE")] + n2 --> n1 + n3 --> n1 + n1 --> n4 +``` +Rendered: +```mermaid +flowchart LR + subgraph n0["Materialized View"] + n1[/"ads-audience +kind: SqlJob +engine: Flink +mode: Streaming"/] + end + n2[("ADS.PAGE_VIEWS")] + n3[("PROFILE.MEMBERS")] + n4[("ADS.AUDIENCE")] + n2 --> n1 + n3 --> n1 + n1 --> n4 +``` + +Renders the deployed dependency graph rooted at `` as Mermaid. +Identifier resolution runs against Calcite's catalog, so the same names you use in SQL work here: + +- A materialized view (`ADS.AUDIENCE`) renders the view's compiled pipeline + with its direct sources and sink. +- A logical table (`LOGICAL.events`) renders the inter-tier pipelines, + any owned triggers, and the per-tier physical resources grouped into + tier subgraphs. +- A physical resource (`KAFKA.events`) traverses the depends-on dependency + index up to `--depth` hops in each direction — pipelines that read or + write it, and recursively their other endpoints. + +`--depth N` only applies to physical-resource targets; view and logical-table +graphs are intentionally single-hop ("what this view does," not the full +upstream chain). For the chain, run `!graph` on a source identifier. + +Rendering backends are pluggable. Mermaid is the default and the only one +shipped today; additional renderers can register via the +`GraphRenderer` SPI — see [Extending Hoptimator](../extending/index.md). + ## Running SQL Hoptimator supports the SQL surface described in diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphEdge.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphEdge.java new file mode 100644 index 00000000..78544374 --- /dev/null +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphEdge.java @@ -0,0 +1,64 @@ +package com.linkedin.hoptimator.graph; + +import java.util.Objects; + + +/** + * A directed edge in a pipeline visualization graph. Edges are equal when their endpoints and + * type match — two edges of different types between the same nodes are distinct (e.g. an + * {@code ownerOf} relationship coexists with a {@code dependsOnSink} relationship). + */ +public final class GraphEdge { + + public enum Type { + /** {@code metadata.ownerReferences} cascade — drives subgraph membership, not arrows. */ + OWNER_OF, + /** Resource → pipeline (or job) edge derived from the {@code depends-on} annotation. */ + DEPENDS_ON_SOURCE, + /** Pipeline (or job) → resource edge derived from the {@code depends-on} annotation. */ + DEPENDS_ON_SINK, + /** Trigger → job/pipeline; rendered as a dotted line. */ + TRIGGERS + } + + private final GraphNode from; + private final GraphNode to; + private final Type type; + + public GraphEdge(GraphNode from, GraphNode to, Type type) { + this.from = Objects.requireNonNull(from, "from"); + this.to = Objects.requireNonNull(to, "to"); + this.type = Objects.requireNonNull(type, "type"); + } + + public GraphNode from() { + return from; + } + + public GraphNode to() { + return to; + } + + public Type type() { + return type; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof GraphEdge)) { + return false; + } + GraphEdge other = (GraphEdge) o; + return type == other.type && from.equals(other.from) && to.equals(other.to); + } + + @Override + public int hashCode() { + return Objects.hash(from, to, type); + } + + @Override + public String toString() { + return from.id() + " --" + type + "--> " + to.id(); + } +} diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphNode.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphNode.java new file mode 100644 index 00000000..235c2fef --- /dev/null +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphNode.java @@ -0,0 +1,240 @@ +package com.linkedin.hoptimator.graph; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Map; +import java.util.Objects; + + +/** + * A node in a pipeline visualization graph. + * + *

Concrete node types are nested final subclasses keyed off a {@link Kind} enum so renderers + * can dispatch on kind without {@code instanceof} chains. Node identity for set/edge purposes is + * determined by {@link #id()} — typically a stable string built from the resource's name, or + * for {@link External} nodes from the database + path identifier. + */ +public abstract class GraphNode { + + public enum Kind { + PIPELINE, + VIEW, + LOGICAL_TABLE, + TRIGGER, + EXTERNAL + } + + private final Kind kind; + private final String id; + + private GraphNode(Kind kind, String id) { + this.kind = Objects.requireNonNull(kind, "kind"); + this.id = Objects.requireNonNull(id, "id"); + } + + public final Kind kind() { + return kind; + } + + public final String id() { + return id; + } + + /** Human-readable label used by renderers as the primary node text. */ + public abstract String displayName(); + + @Override + public final boolean equals(Object o) { + return o instanceof GraphNode && id.equals(((GraphNode) o).id); + } + + @Override + public final int hashCode() { + return id.hashCode(); + } + + @Override + public String toString() { + return kind + "[" + id + "]"; + } + + /** + * A deployed pipeline — the job that materializes a query from sources to a sink. Carries + * optional {@code jobKind} and {@code engine} descriptors extracted from the underlying job + * artifact and surfaced inline in the rendered label so users can tell at a glance whether a + * pipeline runs e.g. on Flink vs a Beam runner. + */ + public static final class Pipeline extends GraphNode { + private final String name; + private final String jobKind; + private final String engine; + private final String executionMode; + + public Pipeline(String name, @Nullable String jobKind, @Nullable String engine, + @Nullable String executionMode) { + super(Kind.PIPELINE, "pipeline:" + name); + this.name = Objects.requireNonNull(name, "name"); + this.jobKind = jobKind; + this.engine = engine; + this.executionMode = executionMode; + } + + public String name() { + return name; + } + + /** Type of the underlying job artifact (e.g. {@code SqlJob}, {@code FlinkSessionJob}). */ + public @Nullable String jobKind() { + return jobKind; + } + + /** + * Execution engine. For a {@code SqlJob}, this is {@code spec.dialect} (e.g. {@code Flink}, + * {@code FlinkBeam}). For other job kinds, inferred from the kind name. + */ + public @Nullable String engine() { + return engine; + } + + /** + * {@code SqlJob} {@code spec.executionMode} — {@code Streaming} or {@code Batch}. Null for + * non-SqlJob kinds where execution mode isn't a first-class spec field. + */ + public @Nullable String executionMode() { + return executionMode; + } + + @Override + public String displayName() { + return name; + } + } + + /** A view — regular or materialized. */ + public static final class View extends GraphNode { + private final String name; + private final boolean materialized; + + public View(String name, boolean materialized) { + super(Kind.VIEW, "view:" + name); + this.name = Objects.requireNonNull(name, "name"); + this.materialized = materialized; + } + + public String name() { + return name; + } + + public boolean materialized() { + return materialized; + } + + @Override + public String displayName() { + return name; + } + } + + /** A logical table. Tier names preserve insertion order. */ + public static final class LogicalTable extends GraphNode { + private final String name; + private final Map tiers; + + public LogicalTable(String name, Map tiers) { + super(Kind.LOGICAL_TABLE, "logicaltable:" + name); + this.name = Objects.requireNonNull(name, "name"); + this.tiers = Objects.requireNonNull(tiers, "tiers"); + } + + public String name() { + return name; + } + + /** Tier name → backing database identifier (e.g. {@code nearline → kafka-db}). */ + public Map tiers() { + return tiers; + } + + @Override + public String displayName() { + return name; + } + } + + /** + * A trigger — a scheduled or event-driven job that fires against its target. Carries optional + * rendering hints (container name, job-template binding) pulled from the trigger's underlying + * job declaration. Renderers display whichever hints are present so the user can tell at a + * glance what the underlying job is doing. + */ + public static final class Trigger extends GraphNode { + private final String name; + private final String schedule; + private final boolean paused; + private final String jobTemplateName; + private final String containerName; + + public Trigger(String name, @Nullable String schedule, boolean paused, + @Nullable String jobTemplateName, @Nullable String containerName) { + super(Kind.TRIGGER, "trigger:" + name); + this.name = Objects.requireNonNull(name, "name"); + this.schedule = schedule; + this.paused = paused; + this.jobTemplateName = jobTemplateName; + this.containerName = containerName; + } + + public String name() { + return name; + } + + /** Cron expression; null if the trigger isn't schedule-driven. */ + public @Nullable String schedule() { + return schedule; + } + + public boolean paused() { + return paused; + } + + /** Job-template name this trigger was rendered from (e.g. {@code retl-job-template}); null when unknown. */ + public @Nullable String jobTemplateName() { + return jobTemplateName; + } + + /** First container's name in the rendered job spec. */ + public @Nullable String containerName() { + return containerName; + } + + @Override + public String displayName() { + return name; + } + } + + /** A resource managed outside Hoptimator (Kafka topic, Venice store, MySQL table, etc.). */ + public static final class External extends GraphNode { + private final String database; + private final List path; + + public External(String database, List path) { + super(Kind.EXTERNAL, "external:" + database + "/" + String.join(".", path)); + this.database = Objects.requireNonNull(database, "database"); + this.path = Objects.requireNonNull(path, "path"); + } + + public String database() { + return database; + } + + public List path() { + return path; + } + + @Override + public String displayName() { + return String.join(".", path); + } + } +} diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphProvider.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphProvider.java new file mode 100644 index 00000000..9452d824 --- /dev/null +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphProvider.java @@ -0,0 +1,22 @@ +package com.linkedin.hoptimator.graph; + +import java.sql.Connection; +import java.sql.SQLException; + + +/** + * SPI for backends that can materialize a {@link PipelineGraph} for a given {@link GraphTarget}. + * Discovered at runtime via {@code ServiceLoader} — implementations register themselves through + * a {@code META-INF/services/com.linkedin.hoptimator.graph.GraphProvider} file in their module. + */ +public interface GraphProvider { + + /** + * Build a graph for the given target. Should only be called when {@link #supports(GraphTarget)} + * returns {@code true} for the same target. + */ + PipelineGraph forTarget(GraphTarget target, int depth, Connection connection) throws SQLException; + + /** Whether this provider can handle the given target kind. */ + boolean supports(GraphTarget target); +} diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphRenderer.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphRenderer.java new file mode 100644 index 00000000..bb1ab90a --- /dev/null +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphRenderer.java @@ -0,0 +1,17 @@ +package com.linkedin.hoptimator.graph; + +/** + * SPI for serializing a {@link PipelineGraph} to a string format (Mermaid, DOT, JSON, etc.). + * Discovered at runtime via {@code ServiceLoader}. + * + *

Each renderer declares its {@link #format()} (e.g. {@code "mermaid"}); the + * {@code GraphService.render(graph, format)} dispatcher picks the matching renderer. + */ +public interface GraphRenderer { + + /** Serialize the graph to a string in this renderer's format. */ + String render(PipelineGraph graph); + + /** Format identifier (e.g. {@code "mermaid"}, {@code "dot"}, {@code "json"}). */ + String format(); +} diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphTarget.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphTarget.java new file mode 100644 index 00000000..d092109b --- /dev/null +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphTarget.java @@ -0,0 +1,88 @@ +package com.linkedin.hoptimator.graph; + +import java.util.List; +import java.util.Objects; + + +/** + * The thing a {@link GraphProvider} is asked to materialize a {@link PipelineGraph} around. + * + *

Abstract base + nested final subclasses, one per CLI mode: + *

    + *
  • {@link View} — graph rooted at a view (regular or materialized).
  • + *
  • {@link LogicalTable} — graph rooted at a logical table.
  • + *
  • {@link Resource} — reverse lookup against the dependency index, rooted at an external + * resource identified by {@code (database, path)}.
  • + *
+ * + *

Providers dispatch on the concrete subclass via {@code instanceof} to route to the right + * internal implementation. Same pattern {@link GraphNode} uses for its kinds. + */ +public abstract class GraphTarget { + + private GraphTarget() { + } + + /** A regular or materialized view, identified by its canonical name. */ + public static final class View extends GraphTarget { + private final String name; + + public View(String name) { + this.name = Objects.requireNonNull(name, "name"); + } + + public String name() { + return name; + } + + @Override + public String toString() { + return "View[" + name + "]"; + } + } + + /** A logical table identified by its canonical name. */ + public static final class LogicalTable extends GraphTarget { + private final String name; + + public LogicalTable(String name) { + this.name = Objects.requireNonNull(name, "name"); + } + + public String name() { + return name; + } + + @Override + public String toString() { + return "LogicalTable[" + name + "]"; + } + } + + /** + * An external resource (Kafka topic, Venice store, MySQL table, etc.) identified by its + * database name plus path components. Used for reverse lookups. + */ + public static final class Resource extends GraphTarget { + private final String database; + private final List path; + + public Resource(String database, List path) { + this.database = Objects.requireNonNull(database, "database"); + this.path = Objects.requireNonNull(path, "path"); + } + + public String database() { + return database; + } + + public List path() { + return path; + } + + @Override + public String toString() { + return "Resource[" + database + "/" + String.join(".", path) + "]"; + } + } +} diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/PipelineGraph.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/PipelineGraph.java new file mode 100644 index 00000000..5ed2a067 --- /dev/null +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/PipelineGraph.java @@ -0,0 +1,44 @@ +package com.linkedin.hoptimator.graph; + +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.Objects; +import java.util.Set; + + +/** + * A pipeline visualization graph: a {@link #root()} node the user asked about plus the surrounding + * nodes and edges discovered during traversal. + * + *

The graph is intentionally minimal — discovery lives in backend-specific + * {@link GraphProvider} implementations and rendering lives in {@link GraphRenderer} + * implementations. This POJO is the wire format between them. + */ +public final class PipelineGraph { + + private final GraphNode root; + private final Set nodes; + private final Set edges; + + public PipelineGraph(GraphNode root, Set nodes, Set edges) { + this.root = Objects.requireNonNull(root, "root"); + this.nodes = Collections.unmodifiableSet(new LinkedHashSet<>(Objects.requireNonNull(nodes, "nodes"))); + this.edges = Collections.unmodifiableSet(new LinkedHashSet<>(Objects.requireNonNull(edges, "edges"))); + if (!this.nodes.contains(root)) { + throw new IllegalArgumentException("root node " + root.id() + " is not in the node set"); + } + } + + /** The node the user asked the graph to be built around (the entity highlighted by renderers). */ + public GraphNode root() { + return root; + } + + public Set nodes() { + return nodes; + } + + public Set edges() { + return edges; + } +} diff --git a/hoptimator-api/src/test/java/com/linkedin/hoptimator/graph/GraphEdgeTest.java b/hoptimator-api/src/test/java/com/linkedin/hoptimator/graph/GraphEdgeTest.java new file mode 100644 index 00000000..c0be08e3 --- /dev/null +++ b/hoptimator-api/src/test/java/com/linkedin/hoptimator/graph/GraphEdgeTest.java @@ -0,0 +1,74 @@ +package com.linkedin.hoptimator.graph; + +import java.util.List; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +/** + * Equality semantics for {@link GraphEdge}. The graph stores edges in a {@code Set}; if two + * edges with the same endpoints + type aren't equal, we'd end up with duplicate arrows in + * the rendered output. + */ +class GraphEdgeTest { + + private static final GraphNode A = new GraphNode.External("db", List.of("a")); + private static final GraphNode B = new GraphNode.External("db", List.of("b")); + + @Test + void equalEdgesShareHashCode() { + GraphEdge e1 = new GraphEdge(A, B, GraphEdge.Type.DEPENDS_ON_SOURCE); + GraphEdge e2 = new GraphEdge(A, B, GraphEdge.Type.DEPENDS_ON_SOURCE); + + assertEquals(e1, e2); + assertEquals(e1.hashCode(), e2.hashCode()); + } + + @Test + void differentTypeMakesEdgesUnequal() { + GraphEdge source = new GraphEdge(A, B, GraphEdge.Type.DEPENDS_ON_SOURCE); + GraphEdge owner = new GraphEdge(A, B, GraphEdge.Type.OWNER_OF); + assertNotEquals(source, owner, + "an OWNER_OF and a DEPENDS_ON_SOURCE between the same nodes should coexist"); + } + + @Test + void differentEndpointsMakeEdgesUnequal() { + GraphEdge ab = new GraphEdge(A, B, GraphEdge.Type.TRIGGERS); + GraphEdge ba = new GraphEdge(B, A, GraphEdge.Type.TRIGGERS); + assertNotEquals(ab, ba, "edges are directed; A→B is distinct from B→A"); + } + + @Test + void notEqualToOtherTypes() { + GraphEdge e = new GraphEdge(A, B, GraphEdge.Type.TRIGGERS); + assertNotEquals("not an edge", e); + assertNotEquals(null, e); + } + + @Test + void constructorRejectsNullArguments() { + assertThrows(NullPointerException.class, + () -> new GraphEdge(null, B, GraphEdge.Type.TRIGGERS)); + assertThrows(NullPointerException.class, + () -> new GraphEdge(A, null, GraphEdge.Type.TRIGGERS)); + assertThrows(NullPointerException.class, + () -> new GraphEdge(A, B, null)); + } + + @Test + void toStringMentionsBothEndpointsAndType() { + GraphEdge e = new GraphEdge(A, B, GraphEdge.Type.TRIGGERS); + String s = e.toString(); + assertNotNull(s); + assertTrue(s.contains(A.id()), "toString should include from id: " + s); + assertTrue(s.contains(B.id()), "toString should include to id: " + s); + assertTrue(s.contains("TRIGGERS"), "toString should include type: " + s); + } +} diff --git a/hoptimator-cli/build.gradle b/hoptimator-cli/build.gradle index 8b2491b6..a3e7d579 100644 --- a/hoptimator-cli/build.gradle +++ b/hoptimator-cli/build.gradle @@ -9,6 +9,7 @@ dependencies { implementation project(':hoptimator-api') implementation project(':hoptimator-avro') implementation project(':hoptimator-demodb') + implementation project(':hoptimator-graph') implementation project(':hoptimator-jdbc') implementation project(':hoptimator-kafka') implementation project(':hoptimator-logical') diff --git a/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java b/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java index 85714bf0..df242d1b 100644 --- a/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java +++ b/hoptimator-cli/src/main/java/sqlline/HoptimatorAppConfig.java @@ -1,6 +1,10 @@ package sqlline; +import com.linkedin.hoptimator.graph.GraphNode; +import com.linkedin.hoptimator.graph.PipelineGraph; import com.linkedin.hoptimator.SqlDialect; +import com.linkedin.hoptimator.graph.mermaid.MermaidRenderer; +import com.linkedin.hoptimator.jdbc.GraphService; import com.linkedin.hoptimator.jdbc.HoptimatorConnection; import com.linkedin.hoptimator.jdbc.HoptimatorDdlUtils; import com.linkedin.hoptimator.jdbc.HoptimatorDriver; @@ -50,6 +54,7 @@ public Collection getCommandHandlers(SqlLine sqlline) { list.add(new PipelineCommandHandler(sqlline)); list.add(new ResolveCommandHandler(sqlline)); list.add(new SpecifyCommandHandler(sqlline)); + list.add(new GraphCommandHandler(sqlline)); return list; } @@ -296,6 +301,128 @@ public boolean echoToFile() { } } + /** + * Renders a Mermaid flowchart for a Hoptimator entity. Forms: + *

+   *   !graph <schema>.<table>                  // 2-level identifier (case-sensitive)
+   *   !graph <catalog>.<schema>.<table>  // 3-level identifier (case-sensitive)
+   * 
+ * Optional flag: {@code --depth N} (default 2). + */ + static final class GraphCommandHandler implements CommandHandler { + + private final SqlLine sqlline; + + GraphCommandHandler(SqlLine sqlline) { + this.sqlline = sqlline; + } + + @Override + public String getName() { + return "graph"; + } + + @Override + public List getNames() { + return Collections.singletonList(getName()); + } + + @Override + public String getHelpText() { + return "Render a Mermaid pipeline graph for a view, logical table, or physical resource."; + } + + @Override + public String matches(String line) { + if (startsWith(line, "!graph") || startsWith(line, "graph")) { + return line; + } + return null; + } + + @Override + public void execute(String line, DispatchCallback dispatchCallback) { + if (!(sqlline.getConnection() instanceof HoptimatorConnection)) { + sqlline.error("This connection doesn't support `!graph`."); + dispatchCallback.setToFailure(); + return; + } + String[] parts = line.trim().split("\\s+"); + // parts[0] = "!graph" (or "graph"); identifier mandatory. + if (parts.length < 2) { + sqlline.error("Usage: !graph [--depth N]"); + dispatchCallback.setToFailure(); + return; + } + String identifier = parts[1]; + int depth = 2; + // Walk through the remaining tokens. Flags consume their value (i += 1); anything else is + // an unknown positional and surfaces as an error rather than getting silently dropped. + int i = 2; + while (i < parts.length) { + if ("--depth".equals(parts[i])) { + if (i + 1 >= parts.length) { + sqlline.error("--depth requires an integer value"); + dispatchCallback.setToFailure(); + return; + } + try { + depth = Integer.parseInt(parts[i + 1]); + } catch (NumberFormatException e) { + sqlline.error("--depth requires an integer; got: " + parts[i + 1]); + dispatchCallback.setToFailure(); + return; + } + i += 2; + } else { + sqlline.error("Unknown argument to !graph: " + parts[i] + + ". Usage: !graph [--depth N]"); + dispatchCallback.setToFailure(); + return; + } + } + + HoptimatorConnection conn = (HoptimatorConnection) sqlline.getConnection(); + try { + PipelineGraph graph = GraphService.buildGraph(identifier, depth, conn); + sqlline.output(GraphService.render(graph, MermaidRenderer.FORMAT)); + // Resource targets root at an External node. A degenerate (root-only) Resource graph at + // depth >= 1 means the label-selector found nothing — the identifier resolved to a real + // schema, but no pipeline references it. Suppress the warning at depth <= 0 since the + // root-only output is the depth bound's effect, not absence of pipelines. + if (depth >= 1 && graph.root() instanceof GraphNode.External && isDegenerate(graph)) { + sqlline.output(degenerateGraphWarning()); + } + } catch (SQLException e) { + sqlline.error(e); + dispatchCallback.setToFailure(); + } + } + + /** A graph is degenerate when it contains only the root and no edges — typical of a typo or + * a not-yet-deployed resource. */ + static boolean isDegenerate(PipelineGraph graph) { + return graph.nodes().size() == 1 && graph.edges().isEmpty(); + } + + /** Mermaid comment line warning that the resource may not exist. Comment syntax keeps the + * output safe to pipe into a renderer. */ + static String degenerateGraphWarning() { + return "%% WARNING: no pipelines reference this resource — the identifier may not exist " + + "or no pipelines have been deployed against it yet."; + } + + @Override + public List getParameterCompleters() { + return Collections.emptyList(); + } + + @Override + public boolean echoToFile() { + return false; + } + } + private static final class IntroCommandHandler implements CommandHandler { private final SqlLine sqlline; diff --git a/hoptimator-cli/src/test/java/sqlline/GraphCommandHandlerTest.java b/hoptimator-cli/src/test/java/sqlline/GraphCommandHandlerTest.java new file mode 100644 index 00000000..f52889f8 --- /dev/null +++ b/hoptimator-cli/src/test/java/sqlline/GraphCommandHandlerTest.java @@ -0,0 +1,82 @@ +package sqlline; + +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +import org.junit.jupiter.api.Test; + +import com.linkedin.hoptimator.graph.GraphEdge; +import com.linkedin.hoptimator.graph.GraphNode; +import com.linkedin.hoptimator.graph.PipelineGraph; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +/** + * Unit tests for the static argument-parsing helpers on {@link HoptimatorAppConfig.GraphCommandHandler}. + * + *

Exercising the full {@code execute(...)} path is impractical without spinning up a sqlline + * shell, so the assertions here focus on the bits of logic that are most likely to regress. + */ +class GraphCommandHandlerTest { + + @Test + void isDegenerateTrueForRootOnlyGraph() { + GraphNode.External root = new GraphNode.External("db", List.of("table")); + Set nodes = singleton(root); + Set edges = empty(); + + PipelineGraph graph = new PipelineGraph(root, nodes, edges); + + assertTrue(HoptimatorAppConfig.GraphCommandHandler.isDegenerate(graph), + "single-node, zero-edge graph is degenerate"); + } + + @Test + void isDegenerateFalseWhenEdgesExist() { + GraphNode.External root = new GraphNode.External("db", List.of("a")); + GraphNode.Pipeline pipe = new GraphNode.Pipeline("p", null, null, null); + Set nodes = pair(root, pipe); + Set edges = oneEdge(new GraphEdge(root, pipe, GraphEdge.Type.DEPENDS_ON_SOURCE)); + + PipelineGraph graph = new PipelineGraph(root, nodes, edges); + + assertFalse(HoptimatorAppConfig.GraphCommandHandler.isDegenerate(graph), + "graph with at least one edge is not degenerate"); + } + + @Test + void degenerateGraphWarningStartsWithMermaidCommentSyntax() { + String warning = HoptimatorAppConfig.GraphCommandHandler.degenerateGraphWarning(); + + assertTrue(warning.startsWith("%% "), + "warning must use Mermaid comment syntax so renderers ignore it: " + warning); + assertTrue(warning.toLowerCase().contains("warning"), + "warning text should be self-evident: " + warning); + } + + private static Set singleton(GraphNode n) { + Set set = new LinkedHashSet<>(); + set.add(n); + return set; + } + + private static Set pair(GraphNode a, GraphNode b) { + Set set = new LinkedHashSet<>(); + set.add(a); + set.add(b); + return set; + } + + private static Set empty() { + return new LinkedHashSet<>(); + } + + private static Set oneEdge(GraphEdge edge) { + Set set = new LinkedHashSet<>(); + set.add(edge); + return set; + } +} diff --git a/hoptimator-graph/build.gradle b/hoptimator-graph/build.gradle new file mode 100644 index 00000000..a81af0e7 --- /dev/null +++ b/hoptimator-graph/build.gradle @@ -0,0 +1,54 @@ +plugins { + id 'java' + id 'maven-publish' +} + +dependencies { + implementation project(':hoptimator-api') + implementation libs.cron.utils +} + +publishing { + repositories { + maven { + name 'GitHubPackages' + url = 'https://maven.pkg.github.com/linkedin/Hoptimator' + credentials { + username = System.getenv('GITHUB_ACTOR') + password = System.getenv('GITHUB_TOKEN') + } + } + maven { + name 'LinkedInJFrog' + url 'https://linkedin.jfrog.io/artifactory/hoptimator' + credentials { + username = System.getenv('JFROG_USERNAME') + password = System.getenv('JFROG_API_KEY') + } + } + } + publications { + maven(MavenPublication) { + groupId = 'com.linkedin.hoptimator' + artifactId = 'hoptimator-graph' + version = System.getenv('VERSION') + from components.java + pom { + name = 'hoptimator-graph' + description = 'Pipeline graph rendering — SPI-driven providers and renderers' + url = 'https://github.com/linkedin/Hoptimator' + licenses { + license { + name = 'BSD 2-Clause' + url = 'https://raw.githubusercontent.com/linkedin/Hoptimator/main/LICENSE' + } + } + scm { + connection = 'scm:git:git://github.com:linkedin/Hoptimator.git' + developerConnection = 'scm:git:ssh://github.com:linkedin/Hoptimator.git' + url = 'https://github.com/linkedin/Hoptimator' + } + } + } + } +} diff --git a/hoptimator-graph/src/main/java/com/linkedin/hoptimator/graph/mermaid/CronHumanizer.java b/hoptimator-graph/src/main/java/com/linkedin/hoptimator/graph/mermaid/CronHumanizer.java new file mode 100644 index 00000000..a23870bd --- /dev/null +++ b/hoptimator-graph/src/main/java/com/linkedin/hoptimator/graph/mermaid/CronHumanizer.java @@ -0,0 +1,64 @@ +package com.linkedin.hoptimator.graph.mermaid; + +import java.util.Locale; + +import com.cronutils.descriptor.CronDescriptor; +import com.cronutils.model.definition.CronDefinition; +import com.cronutils.model.definition.CronDefinitionBuilder; +import com.cronutils.parser.CronParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Renders a cron expression as English prose for the Mermaid Trigger node label. + * + *

Delegates to {@code cron-utils}'s {@link CronDescriptor}, which already understands the + * full grammar (ranges, lists, step values, nicknames). Anything {@link CronParser} rejects — + * malformed input, foreign dialects we don't model — falls back to the raw expression so the + * user still sees something useful instead of a wrong English phrase. + * + *

The cron grammar matches the one {@code TableTriggerReconciler} uses at runtime, so + * what the visualizer accepts and what the operator accepts don't drift. + */ +final class CronHumanizer { + + private static final Logger log = LoggerFactory.getLogger(CronHumanizer.class); + + private static final CronDefinition CRON_DEFINITION = CronDefinitionBuilder.defineCron() + .withMinutes().withValidRange(0, 59).withStrictRange().and() + .withHours().withValidRange(0, 23).withStrictRange().and() + .withDayOfMonth().withValidRange(1, 31).withStrictRange().and() + .withMonth().withValidRange(1, 12).withStrictRange().and() + .withDayOfWeek().withValidRange(0, 7).withMondayDoWValue(1).withIntMapping(7, 0).withStrictRange().and() + .withSupportedNicknameHourly() + .withSupportedNicknameDaily() + .withSupportedNicknameWeekly() + .withSupportedNicknameMonthly() + .withSupportedNicknameYearly() + .withSupportedNicknameAnnually() + .withSupportedNicknameMidnight() + .instance(); + + private static final CronParser PARSER = new CronParser(CRON_DEFINITION); + private static final CronDescriptor DESCRIPTOR = CronDescriptor.instance(Locale.US); + + private CronHumanizer() { + } + + /** Render a cron expression as English prose, or return the input unchanged when parsing fails. */ + static String humanize(String cron) { + if (cron == null || cron.isEmpty()) { + return cron; + } + try { + return DESCRIPTOR.describe(PARSER.parse(cron.trim())); + } catch (IllegalArgumentException e) { + // CronParser throws IllegalArgumentException for unrecognized expressions (Quartz 6-field + // forms, vendor-specific syntax, etc.). Fall back to the raw form rather than swallow the + // user's intent behind a wrong phrase. + log.warn("Unable to parse cron expression={}", cron, e); + return cron; + } + } +} diff --git a/hoptimator-graph/src/main/java/com/linkedin/hoptimator/graph/mermaid/MermaidRenderer.java b/hoptimator-graph/src/main/java/com/linkedin/hoptimator/graph/mermaid/MermaidRenderer.java new file mode 100644 index 00000000..2f4fd886 --- /dev/null +++ b/hoptimator-graph/src/main/java/com/linkedin/hoptimator/graph/mermaid/MermaidRenderer.java @@ -0,0 +1,289 @@ +package com.linkedin.hoptimator.graph.mermaid; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import com.linkedin.hoptimator.graph.GraphEdge; +import com.linkedin.hoptimator.graph.GraphNode; +import com.linkedin.hoptimator.graph.GraphRenderer; +import com.linkedin.hoptimator.graph.PipelineGraph; + + +/** + * {@link GraphRenderer} that serializes a {@link PipelineGraph} as a Mermaid {@code flowchart} + * string. Registered via {@code META-INF/services/com.linkedin.hoptimator.graph.GraphRenderer} so it's + * discovered through {@link com.linkedin.hoptimator.jdbc.GraphService}. + * + *

Visual encoding: + *

    + *
  • {@link GraphNode.External} — cylinder.
  • + *
  • {@link GraphNode.Pipeline} — parallelogram, with optional kind/engine inline.
  • + *
  • {@link GraphNode.View} — rectangle ({@code "Materialized View"} prefix when applicable).
  • + *
  • {@link GraphNode.LogicalTable} — top-level subgraph wrapper; tiers nest inside as subgraphs.
  • + *
  • {@link GraphNode.Trigger} — rhombus with cron + paused state + job hints.
  • + *
+ * + *

Edges: + *

    + *
  • {@link GraphEdge.Type#DEPENDS_ON_SOURCE}, {@link GraphEdge.Type#DEPENDS_ON_SINK} — solid arrow.
  • + *
  • {@link GraphEdge.Type#TRIGGERS} — dotted arrow.
  • + *
  • {@link GraphEdge.Type#OWNER_OF} — drives subgraph membership instead of an arrow.
  • + *
+ * + *

Orientation: {@code TD} for LogicalTable graphs (with {@code direction LR} inside the LT + * subgraph so inter-tier flows still read left-to-right); {@code LR} otherwise. + */ +public final class MermaidRenderer implements GraphRenderer { + + public static final String FORMAT = "mermaid"; + + @Override + public String format() { + return FORMAT; + } + + @Override + public String render(PipelineGraph graph) { + StringBuilder sb = new StringBuilder(); + sb.append("flowchart ").append(orientation(graph)).append("\n"); + + // Stable mermaid IDs (n0, n1, ...) keyed off node identity. + Map mermaidIds = assignIds(graph); + + // OWNER_OF edges drive subgraph membership; collect them so we can group children inside + // their owner. + Map> ownedChildren = new LinkedHashMap<>(); + Set ownedNodes = new LinkedHashSet<>(); + for (GraphEdge e : graph.edges()) { + if (e.type() == GraphEdge.Type.OWNER_OF) { + ownedChildren.computeIfAbsent(e.from(), k -> new LinkedHashSet<>()).add(e.to()); + ownedNodes.add(e.to()); + } + } + + // Render nodes — owners get a subgraph wrapper, owned children render inside it, free + // nodes render at the top level. + Set rendered = new LinkedHashSet<>(); + for (GraphNode node : graph.nodes()) { + if (rendered.contains(node) || ownedNodes.contains(node)) { + continue; + } + if (node.kind() == GraphNode.Kind.LOGICAL_TABLE) { + renderLogicalTableSubgraph(sb, (GraphNode.LogicalTable) node, ownedChildren, mermaidIds, rendered, " "); + } else if (ownedChildren.containsKey(node)) { + renderOwnerSubgraph(sb, node, ownedChildren, mermaidIds, rendered, " "); + } else { + sb.append(" ").append(renderNode(node, mermaidIds)).append("\n"); + rendered.add(node); + } + } + + // Render arrows (everything except OWNER_OF). + for (GraphEdge e : graph.edges()) { + if (e.type() == GraphEdge.Type.OWNER_OF) { + continue; + } + sb.append(" ").append(mermaidIds.get(e.from())) + .append(arrow(e.type())) + .append(mermaidIds.get(e.to())).append("\n"); + } + return sb.toString(); + } + + // ─── Subgraph rendering ────────────────────────────────────────────────── + + private static void renderLogicalTableSubgraph(StringBuilder sb, GraphNode.LogicalTable lt, + Map> ownedChildren, Map ids, + Set rendered, String indent) { + String ltId = ids.get(lt); + // LT wrapper carries both kind and name — none of the inner tier subgraphs or owned + // children show the LT name, so it has to live here. (Contrast with View wrappers: the + // inner pipeline node duplicates the View name, so the wrapper drops the name.) + sb.append(indent).append("subgraph ").append(ltId).append("[\"LogicalTable ") + .append(escape(lt.displayName())).append("\"]\n"); + sb.append(indent).append(" direction LR\n"); + rendered.add(lt); + + // Group LogicalTable's owned children into tier subgraphs based on the tier-database match. + Map tiers = lt.tiers(); + Map> nodesByTier = new LinkedHashMap<>(); + Set nonTierChildren = new LinkedHashSet<>(); + Set children = ownedChildren.getOrDefault(lt, new LinkedHashSet<>()); + + for (GraphNode child : children) { + String tier = tierFor(child, tiers); + if (tier != null) { + nodesByTier.computeIfAbsent(tier, k -> new LinkedHashSet<>()).add(child); + } else { + nonTierChildren.add(child); + } + } + + // Emit a subgraph per tier, in the order the LogicalTable declares them. + for (String tier : tiers.keySet()) { + Set tierNodes = nodesByTier.get(tier); + if (tierNodes == null) { + continue; + } + sb.append(indent).append(" subgraph ").append(safeId(tier)) + .append("[\"").append(escape(tier)).append("\"]\n"); + for (GraphNode n : tierNodes) { + sb.append(indent).append(" ").append(renderNode(n, ids)).append("\n"); + rendered.add(n); + } + sb.append(indent).append(" end\n"); + } + + // Anything else owned by the LogicalTable (Pipelines, Triggers) — flat inside the wrapper. + for (GraphNode n : nonTierChildren) { + sb.append(indent).append(" ").append(renderNode(n, ids)).append("\n"); + rendered.add(n); + } + + sb.append(indent).append("end\n"); + } + + private static void renderOwnerSubgraph(StringBuilder sb, GraphNode owner, + Map> ownedChildren, Map ids, + Set rendered, String indent) { + // Mermaid rejects "Setting n0 as parent of n0" if a node id collides with the subgraph id. + // The owner is the subgraph (its display name is the subgraph title), so we don't emit it + // as a separate node — same convention LogicalTable already uses. + String ownerId = ids.get(owner); + sb.append(indent).append("subgraph ").append(ownerId).append("[\"") + .append(escape(subgraphTitle(owner))).append("\"]\n"); + rendered.add(owner); + for (GraphNode child : ownedChildren.get(owner)) { + sb.append(indent).append(" ").append(renderNode(child, ids)).append("\n"); + rendered.add(child); + } + sb.append(indent).append("end\n"); + } + + /** + * Subgraph title for an owner. Strips the resource name from {@link GraphNode.View}'s + * display string ("Materialized View foo" → "Materialized View") since the owned pipeline + * node already shows the name inside the subgraph — repeating it on the wrapper is noise. + * Other node kinds fall through to {@link GraphNode#displayName()} unchanged. + */ + private static String subgraphTitle(GraphNode owner) { + if (owner instanceof GraphNode.View) { + return ((GraphNode.View) owner).materialized() ? "Materialized View" : "View"; + } + return owner.displayName(); + } + + private static String tierFor(GraphNode child, Map tiers) { + if (!(child instanceof GraphNode.External)) { + return null; + } + String db = ((GraphNode.External) child).database(); + for (Map.Entry e : tiers.entrySet()) { + // Null-safe: LogicalTable tier maps can carry null databases for tiers declared without a + // resolved binding. Objects.equals treats null==null as a match (caller side) and avoids + // NPE when only one side is null. + if (Objects.equals(e.getValue(), db)) { + return e.getKey(); + } + } + return null; + } + + // ─── Per-node mermaid syntax ───────────────────────────────────────────── + + private static String renderNode(GraphNode node, Map ids) { + String id = ids.get(node); + switch (node.kind()) { + case EXTERNAL: { + GraphNode.External ext = (GraphNode.External) node; + return id + "[(\"" + escape(ext.displayName()) + "\")]"; + } + case PIPELINE: { + GraphNode.Pipeline p = (GraphNode.Pipeline) node; + StringBuilder lbl = new StringBuilder(p.displayName()); + if (p.jobKind() != null) { + lbl.append("
kind: ").append(p.jobKind()); + } + if (p.engine() != null) { + lbl.append("
engine: ").append(p.engine()); + } + if (p.executionMode() != null) { + lbl.append("
mode: ").append(p.executionMode()); + } + return id + "[/\"" + escape(lbl.toString()) + "\"/]"; + } + case VIEW: { + return id + "[\"" + escape(node.displayName()) + "\"]"; + } + case TRIGGER: { + GraphNode.Trigger t = (GraphNode.Trigger) node; + StringBuilder lbl = new StringBuilder(t.displayName()); + if (t.schedule() != null) { + lbl.append("
cron: ").append(CronHumanizer.humanize(t.schedule())); + } + if (t.jobTemplateName() != null) { + lbl.append("
template: ").append(t.jobTemplateName()); + } + if (t.containerName() != null) { + lbl.append("
container: ").append(t.containerName()); + } + if (t.paused()) { + lbl.append("
(paused)"); + } + return id + "{\"" + escape(lbl.toString()) + "\"}"; + } + case LOGICAL_TABLE: + // LogicalTable always renders as a subgraph wrapper, never as a flat node. + return id + "[\"" + escape(node.displayName()) + "\"]"; + default: + return id + "[\"" + escape(node.displayName()) + "\"]"; + } + } + + // ─── Edge syntax ───────────────────────────────────────────────────────── + + private static String arrow(GraphEdge.Type type) { + switch (type) { + case TRIGGERS: + return " -.-> "; + case DEPENDS_ON_SOURCE: + case DEPENDS_ON_SINK: + default: + return " --> "; + } + } + + // ─── Helpers ───────────────────────────────────────────────────────────── + + private static String orientation(PipelineGraph graph) { + return graph.root().kind() == GraphNode.Kind.LOGICAL_TABLE ? "TD" : "LR"; + } + + private static Map assignIds(PipelineGraph graph) { + Map ids = new HashMap<>(); + int i = 0; + for (GraphNode n : graph.nodes()) { + ids.put(n, "n" + i++); + } + return ids; + } + + private static String safeId(String raw) { + StringBuilder sb = new StringBuilder("s_"); + for (char c : raw.toCharArray()) { + sb.append(Character.isLetterOrDigit(c) ? c : '_'); + } + return sb.toString(); + } + + private static String escape(String s) { + if (s == null) { + return ""; + } + return s.replace("\"", """); + } +} diff --git a/hoptimator-graph/src/main/resources/META-INF/services/com.linkedin.hoptimator.graph.GraphRenderer b/hoptimator-graph/src/main/resources/META-INF/services/com.linkedin.hoptimator.graph.GraphRenderer new file mode 100644 index 00000000..43c171e3 --- /dev/null +++ b/hoptimator-graph/src/main/resources/META-INF/services/com.linkedin.hoptimator.graph.GraphRenderer @@ -0,0 +1 @@ +com.linkedin.hoptimator.graph.mermaid.MermaidRenderer diff --git a/hoptimator-graph/src/test/java/com/linkedin/hoptimator/graph/mermaid/CronHumanizerTest.java b/hoptimator-graph/src/test/java/com/linkedin/hoptimator/graph/mermaid/CronHumanizerTest.java new file mode 100644 index 00000000..d4c9ffe5 --- /dev/null +++ b/hoptimator-graph/src/test/java/com/linkedin/hoptimator/graph/mermaid/CronHumanizerTest.java @@ -0,0 +1,77 @@ +package com.linkedin.hoptimator.graph.mermaid; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +/** + * The bulk of the cron-grammar work happens inside {@code cron-utils}, so we don't re-test that + * library here. These cases pin the wrapper contract: standard 5-field expressions get translated + * to prose, and anything the parser rejects (Quartz 6-field, malformed input, null/empty) is + * returned unchanged so the user always sees something useful in the trigger label. + * + *

Substring assertions intentionally — the exact phrasing belongs to cron-utils and shouldn't + * lock us into a specific library version. + */ +class CronHumanizerTest { + + @Test + void everyMinute() { + String out = CronHumanizer.humanize("* * * * *"); + assertTrue(out.toLowerCase().contains("minute"), + "expected a humanized phrase mentioning 'minute', got: " + out); + } + + @Test + void everyNMinutes() { + String out = CronHumanizer.humanize("*/5 * * * *"); + assertTrue(out.contains("5") && out.toLowerCase().contains("minute"), + "expected '5' and 'minute' in: " + out); + } + + @Test + void everyNHours() { + String out = CronHumanizer.humanize("0 */6 * * *"); + assertTrue(out.contains("6") && out.toLowerCase().contains("hour"), + "expected '6' and 'hour' in: " + out); + } + + @Test + void daily() { + String out = CronHumanizer.humanize("0 0 * * *"); + // cron-utils renders this as "at 00:00" — no day-of-week qualifier when DOW is wildcard. + assertTrue(out.contains("00:00"), "expected '00:00' in: " + out); + } + + @Test + void weekly() { + String out = CronHumanizer.humanize("0 3 * * 1"); + assertTrue(out.contains("03:00") && out.toLowerCase().contains("monday"), + "expected '03:00' and 'Monday' in: " + out); + } + + @Test + void unparseableExpressionsFallThroughUnchanged() { + // Quartz 6-field — our CronDefinition is 5-field, so the parser rejects this. + assertEquals("0 0 0 * * ?", CronHumanizer.humanize("0 0 0 * * ?")); + // Garbage input — also rejected. + assertEquals("not a cron", CronHumanizer.humanize("not a cron")); + } + + @Test + void nicknamesParse() { + // CronDefinition opts in to @hourly / @daily / @weekly etc., so they should describe rather + // than fall through. We don't pin exact phrasing. + String hourly = CronHumanizer.humanize("@hourly"); + assertTrue(hourly.toLowerCase().contains("hour"), "expected 'hour' in: " + hourly); + } + + @Test + void nullAndEmptyPassThrough() { + assertNull(CronHumanizer.humanize(null)); + assertEquals("", CronHumanizer.humanize("")); + } +} diff --git a/hoptimator-graph/src/test/java/com/linkedin/hoptimator/graph/mermaid/MermaidRendererTest.java b/hoptimator-graph/src/test/java/com/linkedin/hoptimator/graph/mermaid/MermaidRendererTest.java new file mode 100644 index 00000000..1bfb2378 --- /dev/null +++ b/hoptimator-graph/src/test/java/com/linkedin/hoptimator/graph/mermaid/MermaidRendererTest.java @@ -0,0 +1,242 @@ +package com.linkedin.hoptimator.graph.mermaid; + +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.junit.jupiter.api.Test; + +import com.linkedin.hoptimator.graph.GraphEdge; +import com.linkedin.hoptimator.graph.GraphNode; +import com.linkedin.hoptimator.graph.PipelineGraph; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +/** + * Unit tests for {@link MermaidRenderer}. These exercise the rendering rules in isolation + * (hand-built {@link PipelineGraph} fixtures, no K8s involvement). + * + *

Assertions check structural invariants — orientation, subgraph membership, edge styles — + * rather than comparing the entire string verbatim. That keeps the tests resilient to whitespace + * tweaks while pinning down the contracts that drive readability. + */ +class MermaidRendererTest { + + @Test + void rendersMaterializedViewWithLrOrientation() { + GraphNode.View root = new GraphNode.View("audience", true); + GraphNode.External kafka = new GraphNode.External("kafka1", + Arrays.asList("KAFKA", "events")); + GraphNode.External venice = new GraphNode.External("venice-prod", + Arrays.asList("VENICE", "profile")); + GraphNode.Pipeline pipeline = new GraphNode.Pipeline("audience", null, null, null); + GraphNode.External sink = new GraphNode.External("ads", + Arrays.asList("VENICE", "audience")); + + Set nodes = setOf(root, kafka, venice, pipeline, sink); + Set edges = setOf( + new GraphEdge(root, pipeline, GraphEdge.Type.OWNER_OF), + new GraphEdge(kafka, pipeline, GraphEdge.Type.DEPENDS_ON_SOURCE), + new GraphEdge(venice, pipeline, GraphEdge.Type.DEPENDS_ON_SOURCE), + new GraphEdge(pipeline, sink, GraphEdge.Type.DEPENDS_ON_SINK)); + + String mermaid = new MermaidRenderer().render(new PipelineGraph(root, nodes, edges)); + + assertTrue(mermaid.startsWith("flowchart LR"), + "Materialized view should render top-down LR: " + mermaid); + // OWNER_OF must drive the subgraph wrapper, not an arrow. + assertFalse(mermaid.contains("--OWNER_OF"), "OWNER_OF must not surface as an arrow"); + // The view's owned pipeline lives inside the View's subgraph wrapper. + assertTrue(mermaid.contains("subgraph "), "owners get subgraph wrappers"); + } + + @Test + void rendersLogicalTableTopDownWithTierSubgraphs() { + Map tiers = new LinkedHashMap<>(); + tiers.put("nearline", "kafka-db"); + tiers.put("online", "venice-db"); + tiers.put("offline", "hdfs-db"); + GraphNode.LogicalTable root = new GraphNode.LogicalTable("foo", tiers); + + GraphNode.External nearline = new GraphNode.External("kafka-db", + Arrays.asList("kafka-db", "foo")); + GraphNode.External online = new GraphNode.External("venice-db", + Arrays.asList("venice-db", "foo")); + GraphNode.External offline = new GraphNode.External("hdfs-db", + Arrays.asList("hdfs-db", "foo")); + GraphNode.Trigger trigger = new GraphNode.Trigger("foo-offline-trigger", + "0 */6 * * *", false, null, null); + + Set nodes = setOf(root, nearline, online, offline, trigger); + Set edges = setOf( + new GraphEdge(root, nearline, GraphEdge.Type.OWNER_OF), + new GraphEdge(root, online, GraphEdge.Type.OWNER_OF), + new GraphEdge(root, offline, GraphEdge.Type.OWNER_OF), + new GraphEdge(root, trigger, GraphEdge.Type.OWNER_OF), + new GraphEdge(trigger, offline, GraphEdge.Type.TRIGGERS)); + + String mermaid = new MermaidRenderer().render(new PipelineGraph(root, nodes, edges)); + + assertTrue(mermaid.startsWith("flowchart TD"), + "LogicalTable should render TD outer, LR inner: " + mermaid); + assertTrue(mermaid.contains("direction LR"), + "LogicalTable subgraph should set inner direction LR: " + mermaid); + // Each tier should produce its own nested subgraph header. + assertTrue(mermaid.contains("nearline"), "nearline tier subgraph missing: " + mermaid); + assertTrue(mermaid.contains("online"), "online tier subgraph missing: " + mermaid); + assertTrue(mermaid.contains("offline"), "offline tier subgraph missing: " + mermaid); + // Trigger arrow uses dotted edge. + assertTrue(mermaid.contains(" -.-> "), "trigger edge should be dotted: " + mermaid); + // Don't pin the exact phrasing — that's cron-utils contract. Just verify the schedule + // makes it onto the trigger label in humanized form rather than the raw cron string. + assertTrue(mermaid.contains("cron: ") && mermaid.contains("6") && mermaid.contains("hour"), + "trigger label should expose the cron schedule humanized: " + mermaid); + } + + @Test + void rendersTriggerPausedSuffix() { + GraphNode.Trigger trigger = new GraphNode.Trigger("t1", "@hourly", true, null, null); + GraphNode.External target = new GraphNode.External("db", List.of("path")); + Set nodes = setOf(trigger, target); + Set edges = setOf(new GraphEdge(trigger, target, GraphEdge.Type.TRIGGERS)); + + String mermaid = new MermaidRenderer().render(new PipelineGraph(trigger, nodes, edges)); + + assertTrue(mermaid.contains("(paused)"), + "paused trigger label should be marked: " + mermaid); + } + + @Test + void rendersJobKindAndEngineInsidePipelineLabel() { + // Pipeline carries optional jobKind/engine pulled from the underlying job artifact — those + // surface inline in the Pipeline node label so the user sees the execution shape at a glance. + GraphNode.Pipeline pipe = new GraphNode.Pipeline("p1", "FlinkDeployment", "Flink", null); + Set nodes = setOf(pipe); + Set edges = setOf(); + + String mermaid = new MermaidRenderer().render(new PipelineGraph(pipe, nodes, edges)); + + assertTrue(mermaid.contains("kind: FlinkDeployment"), + "pipeline label missing jobKind: " + mermaid); + assertTrue(mermaid.contains("engine: Flink"), + "pipeline label missing engine: " + mermaid); + } + + @Test + void emptyGraphRendersOrientationOnly() { + GraphNode.View root = new GraphNode.View("v1", false); + Set nodes = setOf(root); + Set edges = setOf(); + + String mermaid = new MermaidRenderer().render(new PipelineGraph(root, nodes, edges)); + + // Leaf View renders with just the name — rectangle shape conveys "view-ness". + assertEquals("flowchart LR\n n0[\"v1\"]\n", mermaid); + } + + @Test + void triggerWithoutScheduleOmitsCron() { + // Triggers may be event-driven (no cron). Label should not show a "cron:" prefix in that case. + GraphNode.Trigger trigger = new GraphNode.Trigger("t1", null, false, null, null); + GraphNode.External target = new GraphNode.External("db", List.of("path")); + Set nodes = setOf(trigger, target); + Set edges = setOf(new GraphEdge(trigger, target, GraphEdge.Type.TRIGGERS)); + + String mermaid = new MermaidRenderer().render(new PipelineGraph(trigger, nodes, edges)); + + assertFalse(mermaid.contains("cron:"), "trigger without schedule should omit cron line: " + mermaid); + } + + @Test + void ownerSubgraphDoesNotEmitOwnerAsItsOwnChildNode() { + // Mermaid rejects "Setting n0 as parent of n0 would create a cycle" when a subgraph id + // collides with a node id inside it. Pin down the invariant: when an owner gets a subgraph + // wrapper, its mermaid id must appear only as the subgraph header — never as a node line. + GraphNode.View view = new GraphNode.View("audience", true); + GraphNode.Pipeline pipeline = new GraphNode.Pipeline("audience-pipe", null, null, null); + GraphNode.External sink = new GraphNode.External("ads", List.of("realized")); + + Set nodes = setOf(view, pipeline, sink); + Set edges = setOf( + new GraphEdge(view, pipeline, GraphEdge.Type.OWNER_OF), + new GraphEdge(pipeline, sink, GraphEdge.Type.DEPENDS_ON_SINK)); + + String mermaid = new MermaidRenderer().render(new PipelineGraph(view, nodes, edges)); + + // n0 should appear once on the subgraph header line and never as a `n0[...]` node line. + int subgraphHeaders = countOccurrences(mermaid, "subgraph n0["); + int nodeLines = countOccurrences(mermaid, " n0[\""); + assertEquals(1, subgraphHeaders, "owner should be the subgraph header exactly once: " + mermaid); + assertEquals(0, nodeLines, "owner must not also appear as a node line inside its own subgraph: " + mermaid); + } + + @Test + void ownerSubgraphWrapsOwnedChildrenForNonLogicalTable() { + // When a non-LogicalTable node owns children (e.g. a View owning its realizing pipeline), + // the owner gets a subgraph wrapper with the children inside. + GraphNode.View view = new GraphNode.View("audience", true); + GraphNode.Pipeline pipeline = new GraphNode.Pipeline("audience-pipe", null, null, null); + GraphNode.External sink = new GraphNode.External("ads", List.of("realized")); + + Set nodes = setOf(view, pipeline, sink); + Set edges = setOf( + new GraphEdge(view, pipeline, GraphEdge.Type.OWNER_OF), + new GraphEdge(pipeline, sink, GraphEdge.Type.DEPENDS_ON_SINK)); + + String mermaid = new MermaidRenderer().render(new PipelineGraph(view, nodes, edges)); + + assertTrue(mermaid.contains("subgraph "), + "View owning a pipeline should produce a subgraph wrapper: " + mermaid); + // Subgraph wrapper carries only the kind ("Materialized View") — the inner pipeline node + // already shows the resource name, so repeating it on the wrapper is just noise. + assertTrue(mermaid.contains("[\"Materialized View\"]"), + "View subgraph label should be just the kind without the name: " + mermaid); + assertTrue(mermaid.contains("audience"), + "the name should still appear (on the inner pipeline node): " + mermaid); + } + + @Test + void multipleSourcesAllConnectToTheSamePipeline() { + GraphNode.View root = new GraphNode.View("fanin", true); + GraphNode.Pipeline pipe = new GraphNode.Pipeline("fanin-pipe", null, null, null); + GraphNode.External a = new GraphNode.External("db1", List.of("a")); + GraphNode.External b = new GraphNode.External("db2", List.of("b")); + GraphNode.External c = new GraphNode.External("db3", List.of("c")); + + Set nodes = setOf(root, pipe, a, b, c); + Set edges = setOf( + new GraphEdge(root, pipe, GraphEdge.Type.OWNER_OF), + new GraphEdge(a, pipe, GraphEdge.Type.DEPENDS_ON_SOURCE), + new GraphEdge(b, pipe, GraphEdge.Type.DEPENDS_ON_SOURCE), + new GraphEdge(c, pipe, GraphEdge.Type.DEPENDS_ON_SOURCE)); + + String mermaid = new MermaidRenderer().render(new PipelineGraph(root, nodes, edges)); + + int arrowCount = countOccurrences(mermaid, " --> "); + assertEquals(3, arrowCount, "three source arrows expected: " + mermaid); + } + + private static int countOccurrences(String haystack, String needle) { + int count = 0; + int idx = 0; + while ((idx = haystack.indexOf(needle, idx)) != -1) { + count++; + idx += needle.length(); + } + return count; + } + + @SafeVarargs + private static Set setOf(T... items) { + Set set = new LinkedHashSet<>(); + Collections.addAll(set, items); + return set; + } +} diff --git a/hoptimator-jdbc/build.gradle b/hoptimator-jdbc/build.gradle index 9d06d0e9..62ec7ea0 100644 --- a/hoptimator-jdbc/build.gradle +++ b/hoptimator-jdbc/build.gradle @@ -18,6 +18,7 @@ dependencies { testFixturesImplementation libs.quidem testFixturesImplementation libs.calcite.core testFixturesImplementation project(':hoptimator-api') + testFixturesImplementation project(':hoptimator-graph') testFixturesImplementation project(':hoptimator-util') testFixturesImplementation libs.junit.jupiter.api diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/GraphService.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/GraphService.java new file mode 100644 index 00000000..b0f8aa07 --- /dev/null +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/GraphService.java @@ -0,0 +1,158 @@ +package com.linkedin.hoptimator.jdbc; + +import java.sql.SQLException; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.List; +import java.util.ServiceLoader; +import java.util.stream.Collectors; + +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; + +import com.linkedin.hoptimator.Database; +import com.linkedin.hoptimator.graph.GraphProvider; +import com.linkedin.hoptimator.graph.GraphRenderer; +import com.linkedin.hoptimator.graph.GraphTarget; +import com.linkedin.hoptimator.graph.PipelineGraph; +import com.linkedin.hoptimator.util.planner.HoptimatorJdbcSchema; +import org.apache.calcite.util.Util; + + +/** + * SPI dispatcher for {@link GraphProvider} (graph construction) and {@link GraphRenderer} + * (graph serialization). Mirrors the {@code DeploymentService} pattern: providers and + * renderers register via {@code META-INF/services}, callers go through static methods on + * this class to discover and dispatch. + */ +public final class GraphService { + + private GraphService() { + } + + /** + * Build a {@link PipelineGraph} for the given target by dispatching to the first registered + * {@link GraphProvider} that {@code supports(target)}. + * + * @throws SQLException if no provider supports the target, or if the underlying provider + * throws. + */ + public static PipelineGraph buildGraph(String identifier, int depth, HoptimatorConnection connection) + throws SQLException { + if (depth < 0) { + throw new SQLException("depth must be non-negative; got: " + depth); + } + GraphTarget target = resolve(identifier, connection); + for (GraphProvider provider : providers()) { + if (provider.supports(target)) { + return provider.forTarget(target, depth, connection); + } + } + throw new SQLException("No GraphProvider supports target: " + target + + ". Registered providers: " + providerSummary()); + } + + /** + * Walks the user-typed SQL identifier through Calcite's schema tree and produces the + * {@link GraphTarget} subtype that matches what was found: + * + *

    + *
  • {@link MaterializedViewTable} at the leaf → {@link GraphTarget.View}.
  • + *
  • Leaf inside a {@link HoptimatorJdbcSchema} whose downstream surfaces a + * {@code LogicalSchemaMarker}-tagged schema → {@link GraphTarget.LogicalTable}.
  • + *
  • Otherwise — leaf inside a physical {@link Database}-backed schema → + * {@link GraphTarget.Resource}. The graph downstream may still collapse to just the + * root if no pipeline references the resource — that's a legitimate "exists but + * unused" state and surfaces as a no-pipelines warning at render time.
  • + *
+ * + *

Throws {@link SQLException} for any path that can't be resolved: a schema segment + * that doesn't exist, a leaf schema that isn't backed by a Hoptimator {@link Database}, or + * a leaf name that no Table in the catalog matches. All three are real "you typed something + * this connection can't see" conditions, distinct from "table exists but has no pipelines." + * + *

LogicalTable detection is schema-level because the outer connection sees LogicalTables + * wrapped as generic JDBC adapter tables — the class identity is erased on the JDBC hop. + * {@code HoptimatorJdbcSchema} lazily walks its downstream connection to find the marker. + */ + static GraphTarget resolve(String identifier, HoptimatorConnection connection) throws SQLException { + List fullPath = Arrays.asList(identifier.split("\\.")); + SchemaPlus schema = connection.calciteConnection().getRootSchema(); + List schemaPath = Util.skipLast(fullPath); + for (String segment : schemaPath) { + SchemaPlus next = schema == null ? null : schema.subSchemas().get(segment); + if (next == null) { + throw new SQLException("Identifier " + identifier + + " does not exist: schema segment '" + segment + "' not found in this connection."); + } + schema = next; + } + + String leafName = Util.last(fullPath); + Table table = schema == null ? null : schema.tables().get(leafName); + if (table == null) { + throw new SQLException("Identifier " + identifier + + " does not exist: table '" + leafName + "' not found in schema " + + String.join(".", schemaPath) + "."); + } + if (table instanceof MaterializedViewTable) { + return new GraphTarget.View(identifier); + } + + HoptimatorJdbcSchema hjs = schema.unwrap(HoptimatorJdbcSchema.class); + if (hjs == null) { + throw new SQLException("Identifier " + identifier + + " does not resolve to a database in this connection."); + } + if (hjs.isLogical()) { + return new GraphTarget.LogicalTable(identifier); + } + return new GraphTarget.Resource(hjs.databaseName(), fullPath); + } + + /** + * Render a graph using the registered {@link GraphRenderer} whose {@link GraphRenderer#format()} + * matches {@code format} (case-insensitive). + * + * @throws IllegalArgumentException if no renderer supports {@code format}. + */ + public static String render(PipelineGraph graph, String format) { + for (GraphRenderer renderer : renderers()) { + if (renderer.format().equalsIgnoreCase(format)) { + return renderer.render(graph); + } + } + throw new IllegalArgumentException("No GraphRenderer registered for format: " + format + + ". Available: " + availableFormats()); + } + + /** Format identifiers (e.g. {@code mermaid}) registered by visible {@link GraphRenderer}s. */ + public static List availableFormats() { + return renderers().stream() + .map(GraphRenderer::format) + .distinct() + .collect(Collectors.toList()); + } + + // ─── SPI loading ───────────────────────────────────────────────────────── + + private static List providers() { + List providers = new ArrayList<>(); + ServiceLoader.load(GraphProvider.class).iterator().forEachRemaining(providers::add); + return providers; + } + + private static List renderers() { + List renderers = new ArrayList<>(); + ServiceLoader.load(GraphRenderer.class).iterator().forEachRemaining(renderers::add); + return renderers; + } + + private static String providerSummary() { + List names = new ArrayList<>(); + for (GraphProvider p : providers()) { + names.add(p.getClass().getSimpleName()); + } + return names.isEmpty() ? "" : String.join(", ", names); + } +} diff --git a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/GraphServiceTest.java b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/GraphServiceTest.java new file mode 100644 index 00000000..8274f41a --- /dev/null +++ b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/GraphServiceTest.java @@ -0,0 +1,285 @@ +package com.linkedin.hoptimator.jdbc; + +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.schema.impl.ViewTable; +import org.apache.calcite.schema.lookup.Lookup; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.linkedin.hoptimator.graph.GraphNode; +import com.linkedin.hoptimator.graph.GraphTarget; +import com.linkedin.hoptimator.graph.PipelineGraph; +import com.linkedin.hoptimator.util.planner.HoptimatorJdbcSchema; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; + + +/** + * Unit tests for {@link GraphService}. Covers SPI discovery (renderer registration via + * {@code META-INF/services}) and the resolver that walks Calcite's schema tree to figure out + * what kind of entity a SQL identifier refers to. + */ +@ExtendWith(MockitoExtension.class) +class GraphServiceTest { + + @Mock + private HoptimatorConnection connection; + @Mock + private CalciteConnection calciteConnection; + + // ─── SPI discovery ─────────────────────────────────────────────────────── + + @Test + void availableFormatsIncludesMermaid() { + List formats = GraphService.availableFormats(); + assertTrue(formats.contains("mermaid"), + "MermaidRenderer should be discoverable via ServiceLoader: " + formats); + } + + @Test + void renderUnknownFormatThrowsWithHelpfulMessage() { + GraphNode root = new GraphNode.External("db", List.of("t")); + PipelineGraph graph = new PipelineGraph(root, Set.of(root), Set.of()); + + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, + () -> GraphService.render(graph, "definitely-not-a-format")); + assertTrue(ex.getMessage().contains("definitely-not-a-format"), + "error should mention the unknown format: " + ex.getMessage()); + assertTrue(ex.getMessage().contains("Available"), + "error should list available formats: " + ex.getMessage()); + } + + // ─── Identifier resolution via Calcite ──────────────────────────────────── + + @Test + void resolveTwoLevelResourceFindsDatabaseCrdName() throws SQLException { + // User typed ADS.AD_CLICKS. Schema ADS is a Database in the connection with CRD name ads-database, + // and AD_CLICKS is a real table in its catalog (a plain physical table, not a view / LogicalTable). + SchemaPlus ads = schemaWithDatabaseAndTable("ads-database", "AD_CLICKS", plainTable()); + stubRootSchema(schemaWithSubs("ADS", ads)); + + GraphTarget.Resource out = (GraphTarget.Resource) GraphService.resolve("ADS.AD_CLICKS", connection); + + assertEquals("ads-database", out.database()); + assertEquals(Arrays.asList("ADS", "AD_CLICKS"), out.path()); + } + + @Test + void resolveThreeLevelResourceWalksCatalogAndSchema() throws SQLException { + // User typed MYSQL.testdb.orders. MYSQL is the catalog sub-schema; testdb is the Database; + // orders is a real physical table in the catalog. + SchemaPlus testdb = schemaWithDatabaseAndTable("mysql", "orders", plainTable()); + SchemaPlus mysqlCatalog = schemaWithSubs("testdb", testdb); + stubRootSchema(schemaWithSubs("MYSQL", mysqlCatalog)); + + GraphTarget.Resource out = (GraphTarget.Resource) GraphService.resolve( + "MYSQL.testdb.orders", connection); + + assertEquals("mysql", out.database()); + assertEquals(Arrays.asList("MYSQL", "testdb", "orders"), out.path()); + } + + @Test + void resolveUnknownTableInKnownSchemaThrowsSqlException() { + // Schema ADS resolves to a Database, but no AD_CLICKS table exists in the catalog. + // Distinct from "schema doesn't exist": this is "you typo'd the table name", which is + // different from "table exists but no pipeline references it" (the no-pipelines path, + // which doesn't reach this codepath — it's render-time). + SchemaPlus ads = schemaWithDatabaseAndTable("ads-database", /*tableName=*/null, /*table=*/null); + stubRootSchema(schemaWithSubs("ADS", ads)); + + SQLException ex = assertThrows(SQLException.class, + () -> GraphService.resolve("ADS.AD_CLICKS", connection)); + assertTrue(ex.getMessage().contains("ADS.AD_CLICKS"), + "error should name the offending identifier: " + ex.getMessage()); + assertTrue(ex.getMessage().contains("AD_CLICKS"), + "error should pinpoint the missing table name: " + ex.getMessage()); + } + + @Test + void resolveMaterializedViewProducesViewTarget() throws SQLException { + // The leaf table is a MaterializedViewTable — kind detection returns GraphTarget.View. + MaterializedViewTable mv = new MaterializedViewTable(mock(ViewTable.class)); + SchemaPlus profile = schemaWithDatabaseAndTable("profile-database", "AUDIENCE", mv); + stubRootSchema(schemaWithSubs("PROFILE", profile)); + + GraphTarget out = GraphService.resolve("PROFILE.AUDIENCE", connection); + + GraphTarget.View view = assertInstanceOf(GraphTarget.View.class, out); + assertEquals("PROFILE.AUDIENCE", view.name(), + "View target carries the SQL-side identifier; the builder canonicalizes downstream"); + } + + @Test + void resolveLogicalTableProducesLogicalTableTarget() throws SQLException { + // The schema unwraps to a HoptimatorJdbcSchema whose isLogical() is true — detection runs at + // the schema level because the outer JDBC adapter wraps tables and erases the LogicalTable + // class identity. + SchemaPlus logical = schemaWithDatabaseAndTable("logical-database", "MEMBERS", plainTable(), + /*isLogical=*/true); + stubRootSchema(schemaWithSubs("LOGICAL", logical)); + + GraphTarget out = GraphService.resolve("LOGICAL.MEMBERS", connection); + + GraphTarget.LogicalTable ltTarget = assertInstanceOf(GraphTarget.LogicalTable.class, out); + assertEquals("LOGICAL.MEMBERS", ltTarget.name()); + } + + @Test + void resolveUnknownSchemaSegmentThrowsSqlException() { + // User typed UNKNOWN.FOO. Root has no UNKNOWN sub-schema — bail with a clear error. + stubRootSchema(schemaWithSubs("UNKNOWN", null)); + + SQLException ex = assertThrows(SQLException.class, + () -> GraphService.resolve("UNKNOWN.FOO", connection)); + assertTrue(ex.getMessage().contains("UNKNOWN.FOO"), + "error should name the offending identifier: " + ex.getMessage()); + assertTrue(ex.getMessage().contains("UNKNOWN"), + "error should pinpoint which segment failed: " + ex.getMessage()); + } + + @Test + void resolveSchemaNotBackedByHoptimatorJdbcSchemaThrowsSqlException() { + // The schema resolves AND a leaf table exists, but the schema isn't a Hoptimator JDBC + // schema — e.g. an information_schema with system tables. We can't produce a K8s-side + // identifier, so we error rather than silently building an unresolvable Resource. + SchemaPlus nonHoptimator = schemaWithSubs(null, null); + Lookup tables = mock(Lookup.class); + lenient().doReturn(plainTable()).when(tables).get("TABLE"); + lenient().doReturn(tables).when(nonHoptimator).tables(); + lenient().when(nonHoptimator.unwrap(HoptimatorJdbcSchema.class)).thenReturn(null); + stubRootSchema(schemaWithSubs("INFO", nonHoptimator)); + + SQLException ex = assertThrows(SQLException.class, + () -> GraphService.resolve("INFO.TABLE", connection)); + assertTrue(ex.getMessage().contains("INFO.TABLE"), + "error should name the identifier: " + ex.getMessage()); + assertTrue(ex.getMessage().contains("database"), + "error should explain why: " + ex.getMessage()); + } + + @Test + void renderMatchingFormatDispatchesToRenderer() { + // The happy-path complement of renderUnknownFormatThrows — verifies the renderer is + // actually invoked when format matches (case-insensitive). + GraphNode root = new GraphNode.External("db", List.of("t")); + PipelineGraph graph = new PipelineGraph(root, Set.of(root), Set.of()); + + String out = GraphService.render(graph, "MERMAID"); + assertTrue(out.startsWith("flowchart"), + "Mermaid renderer should be picked case-insensitively and produce a flowchart: " + out); + } + + @Test + void buildGraphSurfacesNoProviderError() { + // No GraphProvider is registered on this module's test classpath, so any resolvable + // identifier reaches the "no provider supports target" branch. Wires through the resolver + // first (identifier must walk Calcite cleanly), then fails dispatch. + SchemaPlus ads = schemaWithDatabaseAndTable("ads-database", "AD_CLICKS", plainTable()); + stubRootSchema(schemaWithSubs("ADS", ads)); + + SQLException ex = assertThrows(SQLException.class, + () -> GraphService.buildGraph("ADS.AD_CLICKS", 1, connection)); + assertTrue(ex.getMessage().contains("No GraphProvider supports"), + "error should explain the dispatch failure: " + ex.getMessage()); + } + + @Test + void buildGraphRejectsNegativeDepth() { + // Negative depths are nonsense — error rather than silently clamping. depth 0 is valid + // (renders just the root); only strictly-negative depths fail. + SQLException ex = assertThrows(SQLException.class, + () -> GraphService.buildGraph("PROFILE.MEMBERS", -1, connection)); + assertTrue(ex.getMessage().contains("depth"), + "error should mention the bad parameter: " + ex.getMessage()); + assertTrue(ex.getMessage().contains("-1"), + "error should echo the bad value: " + ex.getMessage()); + } + + @Test + void resolveSinglePartIdentifierThrowsSqlException() { + // Bare names like "audience" walk no schema segments and hit the root for the table lookup. + // Root has no tables in this fixture → "table not found" with the root's empty schema path. + stubRootSchema(schemaWithSubs(null, null)); + + SQLException ex = assertThrows(SQLException.class, + () -> GraphService.resolve("audience", connection)); + assertTrue(ex.getMessage().contains("audience"), + "error should name the offending identifier: " + ex.getMessage()); + } + + // ─── Mock plumbing ─────────────────────────────────────────────────────── + + /** Plain physical table — not a view, not a LogicalTable. Resolver should treat as Resource. */ + private static Table plainTable() { + return new AbstractTable() { + @Override + public RelDataType getRowType(RelDataTypeFactory factory) { + return factory.createStructType(List.of(), List.of()); + } + }; + } + + /** Convenience overload: physical (non-logical) database. */ + private static SchemaPlus schemaWithDatabaseAndTable(String crdName, String tableName, Table table) { + return schemaWithDatabaseAndTable(crdName, tableName, table, /*isLogical=*/false); + } + + /** + * Mock a SchemaPlus that's (a) unwrappable as a {@link HoptimatorJdbcSchema} with the given + * CRD name and {@code isLogical()} flag, and (b) when {@code tableName != null}, exposes + * {@code table} via {@code tables().get(tableName)}. + */ + private static SchemaPlus schemaWithDatabaseAndTable(String crdName, String tableName, Table table, + boolean isLogical) { + SchemaPlus schema = schemaWithSubs(null, null); + HoptimatorJdbcSchema hjs = mock(HoptimatorJdbcSchema.class); + lenient().when(hjs.databaseName()).thenReturn(crdName); + lenient().when(hjs.isLogical()).thenReturn(isLogical); + lenient().when(schema.unwrap(HoptimatorJdbcSchema.class)).thenReturn(hjs); + if (tableName != null) { + Lookup tables = mock(Lookup.class); + lenient().doReturn(table).when(tables).get(tableName); + lenient().doReturn(tables).when(schema).tables(); + } + return schema; + } + + private static SchemaPlus schemaWithSubs(String childName, SchemaPlus child) { + Lookup subs = mock(Lookup.class); + if (childName != null) { + lenient().doReturn(child).when(subs).get(childName); + } + // Default tables() to an empty Lookup so resolve.tables().get(...) returns null cleanly + // for schemas that don't host a relevant Table — individual tests can override. + Lookup emptyTables = mock(Lookup.class); + SchemaPlus s = mock(SchemaPlus.class); + lenient().doReturn(subs).when(s).subSchemas(); + lenient().doReturn(emptyTables).when(s).tables(); + return s; + } + + /** Wire {@code connection.calciteConnection().getRootSchema()} to return {@code root}. */ + private void stubRootSchema(SchemaPlus root) { + lenient().when(connection.calciteConnection()).thenReturn(calciteConnection); + lenient().when(calciteConnection.getRootSchema()).thenReturn(root); + } + +} diff --git a/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java b/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java index b565e077..71497c25 100644 --- a/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java +++ b/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java @@ -11,6 +11,9 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.junit.jupiter.api.Assertions; +import com.linkedin.hoptimator.graph.PipelineGraph; +import com.linkedin.hoptimator.graph.mermaid.MermaidRenderer; + import java.io.File; import java.io.FileReader; import java.io.IOException; @@ -23,6 +26,7 @@ import java.nio.file.Files; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -212,6 +216,59 @@ public void execute(Context context, boolean execute) throws Exception { }; } + if (line.startsWith("graph")) { + return new AbstractCommand() { + @Override + public void execute(Context context, boolean execute) throws Exception { + if (execute) { + Connection connection = context.connection(); + if (!(connection instanceof HoptimatorConnection)) { + throw new IllegalArgumentException("This connection doesn't support `!graph`."); + } + HoptimatorConnection conn = (HoptimatorConnection) connection; + String[] parts = line.trim().split("\\s+"); + if (parts.length < 2) { + throw new IllegalArgumentException( + "Usage: !graph [--depth N]"); + } + String identifier = parts[1]; + int depth = 2; + for (int i = 2; i < parts.length - 1; i++) { + if ("--depth".equals(parts[i])) { + depth = Integer.parseInt(parts[i + 1]); + } + } + + try { + PipelineGraph graph = GraphService.buildGraph(identifier, depth, conn); + String mermaid = GraphService.render(graph, MermaidRenderer.FORMAT); + // Strip a trailing newline so the line list matches what's in the .id file (Quidem + // splits the captured content on newlines without a trailing blank). + if (mermaid.endsWith("\n")) { + mermaid = mermaid.substring(0, mermaid.length() - 1); + } + // Resource targets root at an External node. A degenerate Resource graph means + // the schema resolved but no pipeline references the leaf — same behavior as the + // sqlline `!graph` command. + if (depth >= 1 && graph.root() instanceof com.linkedin.hoptimator.graph.GraphNode.External + && graph.nodes().size() == 1 && graph.edges().isEmpty()) { + mermaid = mermaid + "\n%% WARNING: no pipelines reference this resource — the " + + "identifier may not exist or no pipelines have been deployed against it yet."; + } + context.echo(Arrays.asList(mermaid.split("\n"))); + } catch (SQLException e) { + // Identifier didn't resolve, or some other lookup failure — echo the message + // so the .id script can pin the error text verbatim. + context.echo(Arrays.asList(("Error: " + e.getMessage()).split("\n"))); + } + } else { + context.echo(content); + } + context.echo(copy); + } + }; + } + if (line.startsWith("spec")) { return new AbstractCommand() { @Override @@ -239,4 +296,5 @@ public void execute(Context context, boolean execute) throws Exception { return null; } } + } diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java index 23ee79ef..4da0372b 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApi.java @@ -67,6 +67,20 @@ public T getIfExists(String namespace, String name) throws SQLException { return resp.getObject(); } + public T getIfExists(String name) throws SQLException { + final KubernetesApiResponse resp; + if (endpoint.clusterScoped()) { + resp = context.generic(endpoint).get(name); + } else { + resp = context.generic(endpoint).get(context.namespace(), name); + } + if (resp.getHttpStatusCode() == 404) { + return null; + } + K8sUtils.checkResponse("Error getting " + endpoint().kind() + " " + name, resp); + return resp.getObject(); + } + public T get(T obj) throws SQLException { if (obj.getMetadata().getNamespace() == null && !endpoint.clusterScoped()) { obj.getMetadata().namespace(context.namespace()); diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sGraphProvider.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sGraphProvider.java new file mode 100644 index 00000000..3e666d76 --- /dev/null +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sGraphProvider.java @@ -0,0 +1,66 @@ +package com.linkedin.hoptimator.k8s; + +import java.sql.Connection; +import java.sql.SQLException; + +import com.linkedin.hoptimator.graph.GraphProvider; +import com.linkedin.hoptimator.graph.GraphTarget; +import com.linkedin.hoptimator.graph.PipelineGraph; + + +/** + * {@link GraphProvider} backed by Hoptimator's K8s state. Wraps {@link PipelineGraphBuilder} — + * the existing entry-point methods become routes for each {@link GraphTarget} subtype: + * + *

    + *
  • {@link GraphTarget.View} → {@link PipelineGraphBuilder#forView(String)} (single-hop; + * depth is ignored — see method Javadoc) + *
  • {@link GraphTarget.LogicalTable} → {@link PipelineGraphBuilder#forLogicalTable(String)} + * (single-hop; depth is ignored — see method Javadoc) + *
  • {@link GraphTarget.Resource} → {@link PipelineGraphBuilder#forResource(String, java.util.List, int)}. + * SQL-identifier-to-CRD-name resolution happens in {@link com.linkedin.hoptimator.jdbc.GraphService} + * before dispatch — by the time the target reaches us, {@code Resource.database()} is the + * K8s Database CRD's {@code metadata.name}. + *
+ * + *

Registered via {@code META-INF/services/com.linkedin.hoptimator.graph.GraphProvider} so callers + * reach it through {@link com.linkedin.hoptimator.jdbc.GraphService}. + */ +public class K8sGraphProvider implements GraphProvider { + + @Override + public PipelineGraph forTarget(GraphTarget target, int depth, Connection connection) + throws SQLException { + if (connection == null) { + throw new SQLException("K8sGraphProvider.forTarget requires a non-null JDBC connection; " + + "K8sContext can't be derived from null."); + } + K8sContext context = K8sContext.create(connection); + PipelineGraphBuilder builder = createBuilder(context); + if (target instanceof GraphTarget.View) { + GraphTarget.View v = (GraphTarget.View) target; + return builder.forView(v.name()); + } + if (target instanceof GraphTarget.LogicalTable) { + GraphTarget.LogicalTable lt = (GraphTarget.LogicalTable) target; + return builder.forLogicalTable(lt.name()); + } + if (target instanceof GraphTarget.Resource) { + GraphTarget.Resource r = (GraphTarget.Resource) target; + return builder.forResource(r.database(), r.path(), depth); + } + throw new SQLException("K8sGraphProvider does not support target: " + target); + } + + @Override + public boolean supports(GraphTarget target) { + return target instanceof GraphTarget.View + || target instanceof GraphTarget.LogicalTable + || target instanceof GraphTarget.Resource; + } + + /** Factory method so tests can substitute a mock builder. */ + PipelineGraphBuilder createBuilder(K8sContext context) { + return new PipelineGraphBuilder(context); + } +} diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/LogicalTableNames.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/LogicalTableNames.java new file mode 100644 index 00000000..1d4d2121 --- /dev/null +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/LogicalTableNames.java @@ -0,0 +1,32 @@ +package com.linkedin.hoptimator.k8s; + + +/** + * Canonical names for the K8s objects implicitly created by the LogicalTable deployer + * (inter-tier {@code Pipeline} CRDs and the offline-tier {@code TableTrigger} CRD). + * + *

Shared between the producer (the deployer in {@code hoptimator-logical}) and consumers + * (the visualizer's {@code PipelineGraphBuilder} in {@code hoptimator-k8s}) so name-based + * deduction stays in lockstep with creation. If the producer ever changes the scheme, every + * consumer that derives names breaks at compile time when this class moves. + * + *

Inputs use the un-prefixed table name (e.g. {@code "testevent"}, not + * {@code "logical-testevent"}) — i.e. {@code LogicalTable.spec.tableName}. + */ +public final class LogicalTableNames { + + private static final String TRIGGER_NAME_FORMAT = "logical-%s-offline-trigger"; + + private LogicalTableNames() { + } + + /** CRD name for the implicit inter-tier Pipeline between {@code fromTier} and {@code toTier}. */ + public static String pipelineName(String tableName, String fromTier, String toTier) { + return "logical-" + K8sUtils.canonicalizeName(tableName) + "-" + fromTier + "-to-" + toTier; + } + + /** CRD name for the offline-tier TableTrigger; only meaningful when an offline tier is present. */ + public static String triggerName(String tableName) { + return String.format(TRIGGER_NAME_FORMAT, K8sUtils.canonicalizeName(tableName)); + } +} diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/PipelineGraphBuilder.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/PipelineGraphBuilder.java new file mode 100644 index 00000000..ed1988f8 --- /dev/null +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/PipelineGraphBuilder.java @@ -0,0 +1,703 @@ +package com.linkedin.hoptimator.k8s; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.openapi.models.V1OwnerReference; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.linkedin.hoptimator.graph.GraphEdge; +import com.linkedin.hoptimator.graph.GraphNode; +import com.linkedin.hoptimator.graph.PipelineGraph; +import com.linkedin.hoptimator.k8s.models.V1alpha1LogicalTable; +import com.linkedin.hoptimator.k8s.models.V1alpha1LogicalTableList; +import com.linkedin.hoptimator.k8s.models.V1alpha1LogicalTableSpec; +import com.linkedin.hoptimator.k8s.models.V1alpha1LogicalTableSpecTiers; +import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline; +import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList; +import com.linkedin.hoptimator.k8s.models.V1alpha1TableTrigger; +import com.linkedin.hoptimator.k8s.models.V1alpha1TableTriggerList; +import com.linkedin.hoptimator.k8s.models.V1alpha1View; +import com.linkedin.hoptimator.k8s.models.V1alpha1ViewList; + + +/** + * Builds a {@link PipelineGraph} for visualization by walking K8s CRDs and the + * {@code depends-on-*} dependency index stamped on Pipeline CRDs. + * + *

Three entry points: + *

    + *
  • {@link #forView(String)} — graph rooted at a {@link V1alpha1View}.
  • + *
  • {@link #forLogicalTable(String)} — graph rooted at a {@link V1alpha1LogicalTable}.
  • + *
  • {@link #forResource(String, List, int)} — reverse lookup; graph rooted at an external resource.
  • + *
+ * + *

Depth bounds traversal in both directions through the {@code depends-on-} label + * selector. Callers pick whatever depth they want; the builder only floors negatives at 0 + * so the recursion always terminates. + */ +public final class PipelineGraphBuilder { + + private static final Logger LOG = LoggerFactory.getLogger(PipelineGraphBuilder.class); + + private final K8sApi viewApi; + private final K8sApi pipelineApi; + private final K8sApi logicalTableApi; + private final K8sApi triggerApi; + + public PipelineGraphBuilder(K8sContext context) { + this(new K8sApi<>(context, K8sApiEndpoints.VIEWS), + new K8sApi<>(context, K8sApiEndpoints.PIPELINES), + new K8sApi<>(context, K8sApiEndpoints.LOGICAL_TABLES), + new K8sApi<>(context, K8sApiEndpoints.TABLE_TRIGGERS)); + } + + /** Package-private constructor for tests; accepts pre-built (or fake) K8s APIs. */ + PipelineGraphBuilder(K8sApi viewApi, + K8sApi pipelineApi, + K8sApi logicalTableApi, + K8sApi triggerApi) { + this.viewApi = viewApi; + this.pipelineApi = pipelineApi; + this.logicalTableApi = logicalTableApi; + this.triggerApi = triggerApi; + } + + // ─── Public entry points ───────────────────────────────────────────────── + + public PipelineGraph forView(String name) throws SQLException { + // Accept SQL-side identifiers (e.g. {@code VENICE.test-store$insert-partial}) — the CRD is + // stored under a canonicalized name (lowercase, {@code _} stripped, {@code $} → {@code -}, + // dot-separated parts joined with {@code -}). Canonicalization is idempotent, so passing the + // already-canonical CRD name still works. + String crdName = K8sUtils.canonicalizeName(Arrays.asList(name.split("\\."))); + V1alpha1View view = viewApi.get(crdName); + if (view.getSpec() == null) { + throw new SQLException("view " + crdName + " not found"); + } + boolean materialized = Boolean.TRUE.equals(view.getSpec().getMaterialized()); + GraphNode.View root = new GraphNode.View(crdName, materialized); + + Traversal t = new Traversal(); + t.addNode(root); + + // Materialized views own a Pipeline with the same CRD name (see K8sMaterializedViewDeployer). + // Look it up directly — avoids a namespace-wide LIST. The owner-ref check still runs to defend + // against a coincidentally-named pipeline that isn't actually owned by this view (e.g. stale + // pipeline from a recreated view CRD with a fresh UID). Expansion is single-hop: "what this + // view does," not the full upstream chain — for that, run !graph on a source identifier + // (Resource targets honor depth). + if (materialized) { + String viewUid = view.getMetadata() == null ? null : view.getMetadata().getUid(); + V1alpha1Pipeline pipeline = pipelineApi.getIfExists(crdName); + if (pipeline != null && ownedBy(pipeline.getMetadata(), "View", crdName, viewUid)) { + GraphNode.Pipeline pipeNode = t.expandPipelineDirected(pipeline, Direction.UPSTREAM, 0); + if (pipeNode != null) { + t.addEdge(new GraphEdge(root, pipeNode, GraphEdge.Type.OWNER_OF)); + } + } + } + + return t.build(root); + } + + public PipelineGraph forLogicalTable(String name) throws SQLException { + // Same canonicalization as forView — accept SQL-side identifiers and resolve to the + // canonicalized CRD name. + String crdName = K8sUtils.canonicalizeName(Arrays.asList(name.split("\\."))); + V1alpha1LogicalTable lt = logicalTableApi.get(crdName); + Map tierMap = tierMap(lt.getSpec()); + GraphNode.LogicalTable root = new GraphNode.LogicalTable(crdName, tierMap); + + Traversal t = new Traversal(); + t.addNode(root); + + String ltUid = lt.getMetadata() == null ? null : lt.getMetadata().getUid(); + String tableName = lt.getSpec() == null ? null : lt.getSpec().getTableName(); + + // Implicit inter-tier pipelines: the LogicalTable deployer names them via + // {@link LogicalTableNames#pipelineName} — derive every (from, to) candidate from the tier + // list and probe each by name. The deployer only actually creates a subset of pairs (today: + // nearline→online, nearline→offline), so most candidate GETs miss with 404 — that's fine, and + // it keeps the visualizer decoupled from the deployer's pair-selection rules: whatever the + // deployer actually creates is what we render. Owner-ref check defends against a coincidentally + // -named pipeline owned by something else (or a stale CRD with a different UID). Expansion is + // single-hop — for the chain view, run !graph on a source identifier (Resource targets honor + // depth). + if (tableName != null) { + List tierNames = new ArrayList<>(tierMap.keySet()); + for (String fromTier : tierNames) { + for (String toTier : tierNames) { + if (fromTier.equals(toTier)) { + continue; + } + String candidate = LogicalTableNames.pipelineName(tableName, fromTier, toTier); + V1alpha1Pipeline pipeline = pipelineApi.getIfExists(candidate); + if (pipeline != null && ownedBy(pipeline.getMetadata(), "LogicalTable", crdName, ltUid)) { + GraphNode.Pipeline pipeNode = t.expandPipelineDirected(pipeline, Direction.UPSTREAM, 0); + if (pipeNode != null) { + t.addEdge(new GraphEdge(root, pipeNode, GraphEdge.Type.OWNER_OF)); + } + } + } + } + } + + // OWNER_OF any external whose database matches a tier database — that's a tier-physical + // resource of *this* LogicalTable, so the renderer nests it inside the appropriate tier + // subgraph. Externals that aren't tier-physical (e.g. downstream consumers reached during + // recursive expansion) stay outside. + for (GraphNode node : new ArrayList<>(t.nodes())) { + if (node instanceof GraphNode.External + && tierMap.containsValue(((GraphNode.External) node).database())) { + t.addEdge(new GraphEdge(root, node, GraphEdge.Type.OWNER_OF)); + } + } + + // Owned trigger: the LogicalTable deployer creates at most one trigger per table + // ({@link LogicalTableNames#triggerName}), gated on an offline tier being present. We probe + // unconditionally — a 404 simply means there's no offline tier in this table's spec, which is + // exactly the absence we want to render. + if (tableName != null) { + String triggerCandidate = LogicalTableNames.triggerName(tableName); + V1alpha1TableTrigger trigger = triggerApi.getIfExists(triggerCandidate); + if (trigger != null && ownedBy(trigger.getMetadata(), "LogicalTable", crdName, ltUid)) { + GraphNode.Trigger tNode = triggerNode(trigger); + t.addNode(tNode); + t.addEdge(new GraphEdge(root, tNode, GraphEdge.Type.OWNER_OF)); + attachTriggerFlowEdges(t, trigger, tNode); + } + } + + return t.build(root); + } + + public PipelineGraph forResource(String database, List path, int depth) throws SQLException { + GraphNode.External root = externalNode(database, path); + + int cappedDepth = Math.max(depth, 0); + Traversal t = new Traversal(); + t.addNode(root); + // Reverse-lookup mode: walk in both directions from the root, but each recursion preserves + // its own direction (upstream stays upstream, downstream stays downstream) so unrelated + // siblings of intermediate pipelines don't leak in. + t.expandFromResource(root, cappedDepth, Direction.UPSTREAM); + t.expandFromResource(root, cappedDepth, Direction.DOWNSTREAM); + + return t.build(root); + } + + // ─── Helpers ───────────────────────────────────────────────────────────── + + private static boolean ownedBy(V1ObjectMeta meta, String ownerKind, String ownerName, String ownerUid) { + if (meta == null || meta.getOwnerReferences() == null) { + return false; + } + for (V1OwnerReference ref : meta.getOwnerReferences()) { + if (!ownerKind.equals(ref.getKind())) { + continue; + } + // UID match is strictest; fall back to name match (matches the dep-guard's relaxation). + if (ownerUid != null && ownerUid.equals(ref.getUid())) { + return true; + } + if (ownerName.equals(ref.getName())) { + return true; + } + } + return false; + } + + private GraphNode.External externalNode(String database, List path) { + return new GraphNode.External(database, path); + } + + private static Map tierMap(V1alpha1LogicalTableSpec spec) { + Map out = new LinkedHashMap<>(); + if (spec == null || spec.getTiers() == null) { + return out; + } + for (Map.Entry e : spec.getTiers().entrySet()) { + // Skip tiers with no resolved database — the downstream renderer's tier lookup keys on the + // database value, so a null binding can't be matched and would silently become a malformed + // subgraph. Better to drop the malformed entry and surface the rest of the LogicalTable. + String tierDb = e.getValue() == null ? null : e.getValue().getDatabase(); + if (tierDb == null) { + continue; + } + out.put(e.getKey(), tierDb); + } + return out; + } + + /** + * Pull the workhorse {@code kind:} out of the rendered job YAML inside + * {@code V1alpha1PipelineSpec.yaml}. The yaml stream interleaves data resources (KafkaTopic, + * VeniceStore, etc.) with the actual execution artifact. We pick the first kind whose name + * ends in {@code Job} — covers FlinkSessionJob, SqlJob, EtlJob, batch/v1 Job, + * etc. without having to maintain an allowlist. Returns null if no Job-suffixed kind is found. + */ + static String extractJobKind(String yaml) { + if (yaml == null || yaml.isEmpty()) { + return null; + } + Matcher m = Pattern.compile("(?m)^kind:\\s*(\\S*Job)\\s*$").matcher(yaml); + return m.find() ? m.group(1) : null; + } + + /** + * Cheap engine inference from the job kind. Used for non-{@code SqlJob} job kinds (e.g. + * {@code FlinkSessionJob} where the engine is encoded in the kind name. + * For {@code SqlJob}, use {@link #extractSqlJobField} on {@code spec.dialect} directly — + * the engine is a structured spec field, not something to infer. + */ + static String inferEngine(String jobKind, String yaml) { + if (jobKind == null) { + return null; + } + String kindLower = jobKind.toLowerCase(); + boolean mentionsBeam = kindLower.contains("beam") + || (yaml != null && yaml.toLowerCase().contains("beam")); + if (mentionsBeam && kindLower.contains("flink")) { + return "Flink Beam"; + } + if (mentionsBeam) { + return "Beam"; + } + if (kindLower.contains("flink")) { + return "Flink"; + } + if (kindLower.contains("spark")) { + return "Spark"; + } + return null; + } + + /** + * Pull a scalar field from the {@code spec:} block of the {@code SqlJob} doc in a rendered + * pipeline YAML. The rendered YAML interleaves data resources (KafkaTopic, VeniceStore, …) with + * the SqlJob, so we scope the search to the slice after {@code kind: SqlJob} and before the + * next document boundary. Returns null when the YAML doesn't contain a SqlJob or the field is + * absent. Intended for the CRD's typed-enum fields ({@code dialect}, {@code executionMode}), + * which the deployer always writes as bare scalars. + */ + static String extractSqlJobField(String yaml, String fieldName) { + if (yaml == null || yaml.isEmpty() || fieldName == null) { + return null; + } + Matcher kindMatcher = Pattern.compile("(?m)^kind:\\s*SqlJob\\s*$").matcher(yaml); + if (!kindMatcher.find()) { + return null; + } + String tail = yaml.substring(kindMatcher.start()); + int boundary = tail.indexOf("\n---"); + if (boundary >= 0) { + tail = tail.substring(0, boundary); + } + Matcher fieldMatcher = Pattern.compile("(?m)^\\s+" + Pattern.quote(fieldName) + ":\\s*(\\S+)\\s*$") + .matcher(tail); + return fieldMatcher.find() ? fieldMatcher.group(1) : null; + } + + private static GraphNode.Trigger triggerNode(V1alpha1TableTrigger trigger) { + String name = trigger.getMetadata() == null ? "" : trigger.getMetadata().getName(); + String schedule = trigger.getSpec() == null ? null : trigger.getSpec().getSchedule(); + boolean paused = trigger.getSpec() != null && Boolean.TRUE.equals(trigger.getSpec().getPaused()); + String yaml = trigger.getSpec() == null ? null : trigger.getSpec().getYaml(); + String containerName = extractFirstContainerName(yaml); + String jobTemplate = extractJobTemplateName(name, yaml); + return new GraphNode.Trigger(name, schedule, paused, jobTemplate, containerName); + } + + /** + * Derive the JobTemplate name from the rendered Job's {@code metadata.name}. The deployer + * builds it as {@code -} (both canonicalized), so we strip the + * trigger-name prefix to recover the template name. Returns null if the prefix doesn't match + * (the YAML doesn't follow the convention, or the name isn't present). + */ + static String extractJobTemplateName(String triggerName, String yaml) { + if (triggerName == null || yaml == null || yaml.isEmpty()) { + return null; + } + String jobName = extractMetadataName(yaml); + if (jobName == null) { + return null; + } + String prefix = triggerName + "-"; + if (!jobName.startsWith(prefix) || jobName.length() == prefix.length()) { + return null; + } + return jobName.substring(prefix.length()); + } + + /** Top-level {@code metadata.name:} value in a Kubernetes YAML doc; nullable. */ + static String extractMetadataName(String yaml) { + if (yaml == null || yaml.isEmpty()) { + return null; + } + int metadataIdx = yaml.indexOf("metadata:"); + if (metadataIdx < 0) { + return null; + } + Matcher m = Pattern.compile("(?m)^\\s+name:\\s*(\\S+)\\s*$") + .matcher(yaml.substring(metadataIdx)); + return m.find() ? m.group(1) : null; + } + + /** + * Attach trigger flow edges. Reads the {@code depends-on-sources} and {@code depends-on-sink} + * annotations and emits dotted edges in the data-flow direction: + * {@code source --(dotted)--> trigger --(dotted)--> sink}. If only one endpoint resolves to + * an External in the graph, only that edge is emitted (Shape A). + */ + private static void attachTriggerFlowEdges(PipelineGraphBuilder.Traversal t, + V1alpha1TableTrigger trigger, GraphNode.Trigger tNode) { + String sourceIdentifier = triggerSourceIdentifier(trigger); + if (sourceIdentifier != null) { + GraphNode.External source = t.findExternalByIdentifier(sourceIdentifier); + if (source != null) { + t.addEdge(new GraphEdge(source, tNode, GraphEdge.Type.TRIGGERS)); + } + } + String sinkIdentifier = triggerSinkIdentifier(trigger); + if (sinkIdentifier != null) { + GraphNode.External sink = t.findExternalByIdentifier(sinkIdentifier); + if (sink != null) { + t.addEdge(new GraphEdge(tNode, sink, GraphEdge.Type.TRIGGERS)); + } + } + } + + /** Pull the trigger's source identifier from the {@code depends-on-sources} annotation. */ + private static String triggerSourceIdentifier(V1alpha1TableTrigger trigger) { + return firstAnnotationIdentifier(trigger, DependencyLabels.ANNOTATION_KEY_SOURCES); + } + + /** Pull the trigger's sink identifier from the {@code depends-on-sinks} annotation. Today a + * trigger has at most one sink, so we return the first; if/when triggers carry multiple sinks + * this needs to grow into an iteration. */ + private static String triggerSinkIdentifier(V1alpha1TableTrigger trigger) { + return firstAnnotationIdentifier(trigger, DependencyLabels.ANNOTATION_KEY_SINKS); + } + + /** + * First identifier parsed from a {@code depends-on-*} annotation on the trigger, or null when + * the annotation is missing/empty. {@link DependencyLabels#parseAnnotation(String)} treats null + * inputs as empty, so no caller-side null guard is needed. + */ + private static String firstAnnotationIdentifier(V1alpha1TableTrigger trigger, String annotationKey) { + Set ids = DependencyLabels.parseAnnotation(annotationValue(trigger, annotationKey)); + return ids.isEmpty() ? null : ids.iterator().next(); + } + + private static String annotationValue(V1alpha1TableTrigger trigger, String key) { + if (trigger.getMetadata() == null || trigger.getMetadata().getAnnotations() == null) { + return null; + } + return trigger.getMetadata().getAnnotations().get(key); + } + + /** First container's {@code name:} in the {@code containers:} list; nullable. */ + static String extractFirstContainerName(String yaml) { + if (yaml == null || yaml.isEmpty()) { + return null; + } + int containersIdx = yaml.indexOf("containers:"); + if (containersIdx < 0) { + return null; + } + Matcher m = Pattern.compile("(?m)^\\s*-\\s*name:\\s*(\\S+)\\s*$") + .matcher(yaml.substring(containersIdx)); + return m.find() ? m.group(1) : null; + } + + + // ─── Traversal state holder ────────────────────────────────────────────── + + /** + * Direction of recursive expansion through pipelines/triggers. + * + *

    + *
  • {@link #UPSTREAM} — follow producers. From an external, find pipelines/triggers where + * it appears as the *sink*; from a pipeline, walk to its sources.
  • + *
  • {@link #DOWNSTREAM} — follow consumers. From an external, find pipelines/triggers + * where it appears as a *source*; from a pipeline, walk to its sink.
  • + *
+ * + *

Direction stays sticky across one recursive walk, so unrelated siblings of an + * intermediate pipeline don't get pulled in via the opposite direction. + */ + enum Direction { UPSTREAM, DOWNSTREAM } + + private final class Traversal { + private final Map nodes = new LinkedHashMap<>(); + private final Set edges = new LinkedHashSet<>(); + private final Set expandedResources = new HashSet<>(); + private final Set expandedPipelines = new HashSet<>(); + + void addNode(GraphNode node) { + nodes.putIfAbsent(node.id(), node); + } + + void addEdge(GraphEdge edge) { + edges.add(edge); + } + + /** All nodes added so far, in insertion order. */ + Collection nodes() { + return nodes.values(); + } + + PipelineGraph build(GraphNode root) { + return new PipelineGraph(root, new LinkedHashSet<>(nodes.values()), edges); + } + + /** Find an {@link GraphNode.External} matching a {@code _} identifier. */ + GraphNode.External findExternalByIdentifier(String identifier) { + if (identifier == null) { + return null; + } + for (GraphNode n : nodes.values()) { + if (n instanceof GraphNode.External) { + GraphNode.External e = (GraphNode.External) n; + if (identifier.equals(DependencyLabels.identifier(e.database(), e.path()))) { + return e; + } + } + } + return null; + } + + + /** + * Fan out from an external resource in one direction at a time. {@link Direction#UPSTREAM} + * finds pipelines/triggers that *produce* {@code resource} (where it appears as the sink); + * {@link Direction#DOWNSTREAM} finds those that *consume* it (where it appears as a source). + * Recursion stays in the same direction — once you're walking upstream, an intermediate + * pipeline's siblings don't get pulled in via their downstream side. + */ + void expandFromResource(GraphNode.External resource, int remainingDepth, Direction direction) + throws SQLException { + String key = direction.name() + "/" + resource.database() + "/" + String.join(".", resource.path()); + if (!expandedResources.add(key) || remainingDepth <= 0) { + return; + } + String labelKey = DependencyLabels.labelKey(resource.database(), resource.path()); + String identifier = DependencyLabels.identifier(resource.database(), resource.path()); + + Collection matches = pipelineApi.select(labelKey); + for (V1alpha1Pipeline pipeline : matches) { + if (!annotationMentions(pipeline, identifier)) { + // Stale label or hash collision — ignore. + continue; + } + Set sources = parseAnnotation(pipeline, DependencyLabels.ANNOTATION_KEY_SOURCES); + Set sinks = parseAnnotation(pipeline, DependencyLabels.ANNOTATION_KEY_SINKS); + if (direction == Direction.UPSTREAM) { + // Only pipelines that *produce* resource (resource is one of the sinks). Pipelines + // that merely consume resource are downstream — skip them in upstream walks. + if (!sinks.contains(identifier)) { + continue; + } + } else { + // Only pipelines that *consume* resource (resource is one of the sources). + if (!sources.contains(identifier)) { + continue; + } + } + expandPipelineDirected(pipeline, direction, remainingDepth); + } + + // Triggers carry the same `depends-on-` labels. Same direction filter: + // UPSTREAM — trigger's sink == resource (trigger produces resource). + // DOWNSTREAM — trigger's source == resource (trigger consumes resource). + Collection triggerMatches = triggerApi.select(labelKey); + for (V1alpha1TableTrigger trigger : triggerMatches) { + String triggerSource = triggerSourceIdentifier(trigger); + String triggerSink = triggerSinkIdentifier(trigger); + if (!identifier.equals(triggerSource) && !identifier.equals(triggerSink)) { + // Annotation cross-check — skip stale labels and slug collisions. + continue; + } + if (direction == Direction.UPSTREAM && !identifier.equals(triggerSink)) { + continue; + } + if (direction == Direction.DOWNSTREAM && !identifier.equals(triggerSource)) { + continue; + } + GraphNode.Trigger tNode = triggerNode(trigger); + addNode(tNode); + if (direction == Direction.UPSTREAM) { + // Trigger produces resource; arrow points into resource. + addEdge(new GraphEdge(tNode, resource, GraphEdge.Type.TRIGGERS)); + // Walk further upstream from the trigger's source (if any) — that's what feeds + // the trigger. + if (triggerSource != null) { + GraphNode.External srcExt = externalFromIdentifier(triggerSource); + addNode(srcExt); + addEdge(new GraphEdge(srcExt, tNode, GraphEdge.Type.TRIGGERS)); + expandFromResource(srcExt, remainingDepth - 1, Direction.UPSTREAM); + } + } else { + // Trigger consumes resource; arrow points away from resource. + addEdge(new GraphEdge(resource, tNode, GraphEdge.Type.TRIGGERS)); + if (triggerSink != null) { + GraphNode.External sinkExt = externalFromIdentifier(triggerSink); + addNode(sinkExt); + addEdge(new GraphEdge(tNode, sinkExt, GraphEdge.Type.TRIGGERS)); + expandFromResource(sinkExt, remainingDepth - 1, Direction.DOWNSTREAM); + } + } + } + } + + /** + * Ingest a Pipeline CRD into the graph. Reads source/sink annotations, creates edges, + * and schedules transitive expansion from the other endpoints. Returns the Pipeline node + * so the caller can attach owner-of edges. + */ + GraphNode.Pipeline expandPipeline(V1alpha1Pipeline pipeline) { + String pipeId = pipelineId(pipeline); + if (pipeId == null || !expandedPipelines.add(pipeId)) { + return null; + } + GraphNode.Pipeline pipeNode = pipelineNode(pipeline); + addNode(pipeNode); + + Set sources = parseAnnotation(pipeline, DependencyLabels.ANNOTATION_KEY_SOURCES); + Set sinks = parseAnnotation(pipeline, DependencyLabels.ANNOTATION_KEY_SINKS); + + for (String id : sources) { + GraphNode.External ext = externalFromIdentifier(id); + addNode(ext); + addEdge(new GraphEdge(ext, pipeNode, GraphEdge.Type.DEPENDS_ON_SOURCE)); + } + for (String id : sinks) { + GraphNode.External ext = externalFromIdentifier(id); + addNode(ext); + addEdge(new GraphEdge(pipeNode, ext, GraphEdge.Type.DEPENDS_ON_SINK)); + } + return pipeNode; + } + + /** + * Ingest a pipeline and recursively expand in one direction. The pipeline node is added with + * all its source/sink edges (so the user always sees the pipeline's full shape), but the + * recursion only walks further in {@code direction}: + * + *

    + *
  • {@link Direction#UPSTREAM} — recurse upstream from each of the pipeline's sources. + * The sink is rendered as a leaf node here, since we don't follow it downstream.
  • + *
  • {@link Direction#DOWNSTREAM} — recurse downstream from the pipeline's sink. The + * sources stay as leaves.
  • + *
+ */ + GraphNode.Pipeline expandPipelineDirected(V1alpha1Pipeline pipeline, Direction direction, + int remainingDepth) throws SQLException { + GraphNode.Pipeline pipeNode = expandPipeline(pipeline); + if (pipeNode == null) { + return null; + } + if (remainingDepth <= 0) { + return pipeNode; + } + Set sources = parseAnnotation(pipeline, DependencyLabels.ANNOTATION_KEY_SOURCES); + Set sinks = parseAnnotation(pipeline, DependencyLabels.ANNOTATION_KEY_SINKS); + if (direction == Direction.UPSTREAM) { + for (String id : sources) { + GraphNode.External srcExt = externalFromIdentifier(id); + expandFromResource(srcExt, remainingDepth - 1, Direction.UPSTREAM); + } + } else { + for (String id : sinks) { + GraphNode.External sinkExt = externalFromIdentifier(id); + expandFromResource(sinkExt, remainingDepth - 1, Direction.DOWNSTREAM); + } + } + return pipeNode; + } + + private GraphNode.External externalFromIdentifier(String identifier) { + // Identifier shape: _. Split on the first underscore. + int idx = identifier.indexOf('_'); + if (idx < 0) { + // Missing the separator — likely a stale or hand-edited depends-on annotation. Surface as + // a degraded External (database == identifier, empty path) so we render *something*, but + // log so operators can spot the corruption in production. + LOG.warn("depends-on identifier {} is malformed (missing '_' database/path separator); " + + "rendering as degraded External with no path", identifier); + return externalNode(identifier, Collections.emptyList()); + } + String database = identifier.substring(0, idx); + String pathStr = identifier.substring(idx + 1); + List path = pathStr.isEmpty() ? Collections.emptyList() + : Arrays.asList(pathStr.split("\\.")); + return externalNode(database, path); + } + + private GraphNode.Pipeline pipelineNode(V1alpha1Pipeline pipeline) { + String name = pipeline.getMetadata() == null ? "" : pipeline.getMetadata().getName(); + String yaml = pipeline.getSpec() == null ? null : pipeline.getSpec().getYaml(); + String jobKind = extractJobKind(yaml); + // SqlJob carries dialect/executionMode as structured spec fields — prefer those over + // name-based inference. Other job kinds (FlinkSessionJob, etc.) encode + // the engine in the kind name itself, so inferEngine handles them. + String engine; + String executionMode; + if ("SqlJob".equals(jobKind)) { + engine = extractSqlJobField(yaml, "dialect"); + executionMode = extractSqlJobField(yaml, "executionMode"); + } else { + engine = inferEngine(jobKind, yaml); + executionMode = null; + } + return new GraphNode.Pipeline(name, jobKind, engine, executionMode); + } + + private String pipelineId(V1alpha1Pipeline pipeline) { + V1ObjectMeta meta = pipeline.getMetadata(); + if (meta == null) { + return null; + } + return meta.getNamespace() + "/" + meta.getName(); + } + + private Set parseAnnotation(V1alpha1Pipeline pipeline, String key) { + String value = annotationValue(pipeline, key); + return value == null ? Collections.emptySet() : DependencyLabels.parseAnnotation(value); + } + + private String annotationValue(V1alpha1Pipeline pipeline, String key) { + V1ObjectMeta meta = pipeline.getMetadata(); + if (meta == null || meta.getAnnotations() == null) { + return null; + } + return meta.getAnnotations().get(key); + } + + private boolean annotationMentions(V1alpha1Pipeline pipeline, String identifier) { + // Confirm the label-selector match is real — the resource must appear in either the + // sources or sinks annotation. Used to filter out hash collisions and stale labels. + Set sources = parseAnnotation(pipeline, DependencyLabels.ANNOTATION_KEY_SOURCES); + if (sources.contains(identifier)) { + return true; + } + Set sinks = parseAnnotation(pipeline, DependencyLabels.ANNOTATION_KEY_SINKS); + return sinks.contains(identifier); + } + } +} diff --git a/hoptimator-k8s/src/main/resources/META-INF/services/com.linkedin.hoptimator.graph.GraphProvider b/hoptimator-k8s/src/main/resources/META-INF/services/com.linkedin.hoptimator.graph.GraphProvider new file mode 100644 index 00000000..96127e33 --- /dev/null +++ b/hoptimator-k8s/src/main/resources/META-INF/services/com.linkedin.hoptimator.graph.GraphProvider @@ -0,0 +1 @@ +com.linkedin.hoptimator.k8s.K8sGraphProvider diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sGraphProviderTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sGraphProviderTest.java new file mode 100644 index 00000000..ca8cbaaa --- /dev/null +++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sGraphProviderTest.java @@ -0,0 +1,97 @@ +package com.linkedin.hoptimator.k8s; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.linkedin.hoptimator.graph.GraphTarget; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + + +/** + * Dispatch tests for {@link K8sGraphProvider}. By the time the provider runs, the resolver in + * {@link com.linkedin.hoptimator.jdbc.GraphService} has already turned the user identifier into + * a typed {@link GraphTarget}; the provider's only job is to route each subtype to the right + * {@link PipelineGraphBuilder} entry point and to advertise what it supports. + */ +@ExtendWith(MockitoExtension.class) +class K8sGraphProviderTest { + + @Mock + private MockedStatic contextStatic; + @Mock + private Connection connection; + @Mock + private K8sContext context; + + /** Subclass that hands out a single mock builder so we can verify which entry point fires. */ + private static K8sGraphProvider providerWith(PipelineGraphBuilder builder) { + return new K8sGraphProvider() { + @Override + PipelineGraphBuilder createBuilder(K8sContext context) { + return builder; + } + }; + } + + @Test + void supportsAcceptsAllThreeGraphTargetSubtypes() { + K8sGraphProvider provider = new K8sGraphProvider(); + assertTrue(provider.supports(new GraphTarget.View("audience"))); + assertTrue(provider.supports(new GraphTarget.LogicalTable("members"))); + assertTrue(provider.supports(new GraphTarget.Resource("kafka", List.of("topic")))); + } + + @Test + void forTargetRoutesViewToBuilderForView() throws SQLException { + PipelineGraphBuilder builder = mock(PipelineGraphBuilder.class); + contextStatic.when(() -> K8sContext.create(connection)).thenReturn(context); + + providerWith(builder).forTarget(new GraphTarget.View("audience"), 3, connection); + + verify(builder).forView(eq("audience")); + } + + @Test + void forTargetRoutesLogicalTableToBuilderForLogicalTable() throws SQLException { + PipelineGraphBuilder builder = mock(PipelineGraphBuilder.class); + contextStatic.when(() -> K8sContext.create(connection)).thenReturn(context); + + providerWith(builder).forTarget(new GraphTarget.LogicalTable("members"), 2, connection); + + verify(builder).forLogicalTable(eq("members")); + } + + @Test + void forTargetRoutesResourceToBuilderForResource() throws SQLException { + PipelineGraphBuilder builder = mock(PipelineGraphBuilder.class); + contextStatic.when(() -> K8sContext.create(connection)).thenReturn(context); + + providerWith(builder).forTarget( + new GraphTarget.Resource("ads-database", List.of("ADS", "AD_CLICKS")), 1, connection); + + verify(builder).forResource(eq("ads-database"), eq(List.of("ADS", "AD_CLICKS")), eq(1)); + } + + @Test + void forTargetThrowsOnNullConnection() { + // K8sContext.create can't derive a context from null; turn that into a clear SQLException + // rather than letting an NPE propagate from the helper. + K8sGraphProvider provider = new K8sGraphProvider(); + SQLException ex = assertThrows(SQLException.class, + () -> provider.forTarget(new GraphTarget.View("v"), 1, null)); + assertTrue(ex.getMessage().contains("non-null"), + "error should explain the requirement: " + ex.getMessage()); + } +} diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/PipelineGraphBuilderTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/PipelineGraphBuilderTest.java new file mode 100644 index 00000000..e9f26dc2 --- /dev/null +++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/PipelineGraphBuilderTest.java @@ -0,0 +1,792 @@ +package com.linkedin.hoptimator.k8s; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.openapi.models.V1OwnerReference; + +import org.junit.jupiter.api.Test; + +import com.linkedin.hoptimator.graph.GraphEdge; +import com.linkedin.hoptimator.graph.GraphNode; +import com.linkedin.hoptimator.graph.PipelineGraph; +import com.linkedin.hoptimator.k8s.models.V1alpha1LogicalTable; +import com.linkedin.hoptimator.k8s.models.V1alpha1LogicalTableSpec; +import com.linkedin.hoptimator.k8s.models.V1alpha1LogicalTableSpecTiers; +import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline; +import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineSpec; +import com.linkedin.hoptimator.k8s.models.V1alpha1TableTrigger; +import com.linkedin.hoptimator.k8s.models.V1alpha1TableTriggerSpec; +import com.linkedin.hoptimator.k8s.models.V1alpha1View; +import com.linkedin.hoptimator.k8s.models.V1alpha1ViewSpec; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +/** + * End-to-end tests for {@link PipelineGraphBuilder}. Each test seeds a small fixture of CRDs + * via {@link FakeK8sApi} and exercises one of the three entry points. Assertions probe graph + * structure (node kinds, edge types, ownership wiring) rather than rendered output, which is + * tested separately in {@code MermaidRendererTest}. + */ +class PipelineGraphBuilderTest { + + // ─── Test 1: simple MV with two K8s sources + external sink ────────────── + + @Test + void forViewMaterializedRendersSourcesAndSink() throws SQLException { + V1alpha1View view = view("ads", "audience", true, "view-uid-1"); + // Pipeline name == view CRD name (see K8sMaterializedViewDeployer); the visualizer derives + // the name from the view, so the fixture must mirror that contract. + V1alpha1Pipeline pipeline = pipelineWithSourcesAndSink( + "audience", "view-uid-1", "View", + Arrays.asList("kafka1_KAFKA.events", "venice-prod_VENICE.profile"), + "ads_VENICE.audience"); + + PipelineGraphBuilder builder = builder( + list(view), list(pipeline), list(), list()); + + PipelineGraph graph = builder.forView("audience"); + + assertEquals(GraphNode.Kind.VIEW, graph.root().kind()); + // Expected nodes: View root + 1 Pipeline + 2 sources + 1 sink = 5 nodes. + assertEquals(5, graph.nodes().size(), "graph should have view + pipeline + 2 sources + sink"); + + GraphNode pipelineNode = graphNodeByKind(graph, GraphNode.Kind.PIPELINE); + assertNotNull(pipelineNode); + assertTrue(graph.edges().stream().anyMatch(e -> + e.type() == GraphEdge.Type.OWNER_OF + && e.from().equals(graph.root()) + && e.to().equals(pipelineNode)), + "View should own the realizing pipeline"); + + long sourceEdges = graph.edges().stream() + .filter(e -> e.type() == GraphEdge.Type.DEPENDS_ON_SOURCE).count(); + assertEquals(2, sourceEdges, "two source edges expected"); + + long sinkEdges = graph.edges().stream() + .filter(e -> e.type() == GraphEdge.Type.DEPENDS_ON_SINK).count(); + assertEquals(1, sinkEdges, "one sink edge expected"); + } + + // ─── Test 2: MV-on-MV — Calcite inlining, no special-case handling ──────── + + @Test + void forViewMvOnMvRendersOnlyPhysicalSources() throws SQLException { + // CREATE MV A AS SELECT ... FROM kafka.x + // CREATE MV B AS SELECT * FROM A + // After Calcite inlining, B's pipeline reads kafka.x directly. The graph for B should NOT + // mention A — visualization tracks operational truth, not user intent. + V1alpha1View viewB = view("ns", "b", true, "uid-B"); + V1alpha1Pipeline pipeB = pipelineWithSourcesAndSink( + "b", "uid-B", "View", + List.of("kafka1_KAFKA.x"), + "ns_VENICE.B"); + + PipelineGraphBuilder builder = builder( + list(view("ns", "a", true, "uid-A"), viewB), + list(pipeB), + list(), list()); + + PipelineGraph graph = builder.forView("b"); + + boolean mentionsA = graph.nodes().stream().anyMatch(n -> n.id().endsWith("/a")); + assertFalse(mentionsA, "MV-on-MV should not surface upstream View 'A' — only physical sources"); + assertTrue(graph.nodes().stream().anyMatch(n -> + n instanceof GraphNode.External && ((GraphNode.External) n).database().equals("kafka1")), + "MV-on-MV should surface inlined kafka source"); + } + + // ─── Test 2b: forView is scoped to the view's neighborhood ──────────────── + + @Test + void forViewIsScopedToTheViewNeighborhood() throws SQLException { + // Mirrors the real-world bug. Two MVs share a table as their connection point: + // MV "P" reads from ads.ad_clicks and writes to ads.page_views. + // MV "A" reads from ads.page_views (and profile.members) and writes to ads.audience. + // forView on A should show A's own pipeline + sources + sink, nothing else: + // - No P-pipeline (upstream — chain belongs to Resource targets via !graph on the source). + // - No ads.ad_clicks (P's source, not A's direct neighbor). + // forView on P should show P's own pipeline + sources + sink, nothing else: + // - No A-pipeline (downstream of P's sink). + // - No profile.members / ads.audience. + V1alpha1View viewP = view("ads", "p", true, "uid-P"); + V1alpha1Pipeline pPipe = pipelineWithSourcesAndSink( + "p", "uid-P", "View", + List.of("ads-database_ADS.AD_CLICKS"), + "ads-database_ADS.PAGE_VIEWS"); + + V1alpha1View viewA = view("ads", "a", true, "uid-A"); + V1alpha1Pipeline aPipe = pipelineWithSourcesAndSink( + "a", "uid-A", "View", + Arrays.asList("ads-database_ADS.PAGE_VIEWS", "profile-database_PROFILE.MEMBERS"), + "ads-database_ADS.AUDIENCE"); + + PipelineGraphBuilder builder = builder( + list(viewP, viewA), + list(pPipe, aPipe), + list(), list()); + + // forView P — must NOT include downstream A or its surroundings. + PipelineGraph forP = builder.forView("p"); + boolean pHasOwn = forP.nodes().stream().anyMatch(n -> + n instanceof GraphNode.Pipeline && "p".equals(((GraphNode.Pipeline) n).name())); + boolean pHasA = forP.nodes().stream().anyMatch(n -> + n instanceof GraphNode.Pipeline && "a".equals(((GraphNode.Pipeline) n).name())); + assertTrue(pHasOwn, "P's own pipeline should appear in its graph"); + assertFalse(pHasA, "A's pipeline is downstream of P's sink — must not appear"); + assertFalse(forP.nodes().stream().anyMatch(n -> + n instanceof GraphNode.External + && "profile-database".equals(((GraphNode.External) n).database())), + "profile.members only feeds the downstream MV — must not appear"); + + // forView A — must NOT include upstream P or ads.ad_clicks. The chain view is the job of + // a Resource-target query (!graph on the source identifier). A's graph is A's pipeline + + // its direct sources (PAGE_VIEWS, MEMBERS as leaves) + its sink (AUDIENCE as a leaf). + PipelineGraph forA = builder.forView("a"); + boolean aHasOwn = forA.nodes().stream().anyMatch(n -> + n instanceof GraphNode.Pipeline && "a".equals(((GraphNode.Pipeline) n).name())); + boolean aHasP = forA.nodes().stream().anyMatch(n -> + n instanceof GraphNode.Pipeline && "p".equals(((GraphNode.Pipeline) n).name())); + assertTrue(aHasOwn, "A's own pipeline should appear in its graph"); + assertFalse(aHasP, "P's pipeline is upstream of A's source — must not chain into A's view graph"); + assertFalse(forA.nodes().stream().anyMatch(n -> + n instanceof GraphNode.External + && ((GraphNode.External) n).path().contains("AD_CLICKS")), + "ads.ad_clicks is upstream of A's source PAGE_VIEWS — must not chain into A's view graph"); + } + + // ─── Test 3: LogicalTable with three tiers + offline trigger ───────────── + + @Test + void forLogicalTableRendersTiersAndOwnedPipelinesAndTrigger() throws SQLException { + V1alpha1LogicalTable lt = logicalTable("ns", "events", "uid-lt", linked( + "nearline", "kafka-db", + "online", "venice-db", + "offline", "hdfs-db")); + + // Implicit inter-tier pipelines owned by the LogicalTable. Names match the deployer's scheme + // (LogicalTableNames.pipelineName) — the visualizer probes by those exact names. + V1alpha1Pipeline n2o = pipelineWithSourcesAndSink( + "logical-events-nearline-to-online", "uid-lt", "LogicalTable", + List.of("kafka-db_kafka-db.events"), + "venice-db_venice-db.events"); + V1alpha1Pipeline n2f = pipelineWithSourcesAndSink( + "logical-events-nearline-to-offline", "uid-lt", "LogicalTable", + List.of("kafka-db_kafka-db.events"), + "hdfs-db_hdfs-db.events"); + // Bridging trigger: reads from offline (source) and writes to online (sink), reverse-ETL. + // Name matches LogicalTableNames.triggerName("events"). + V1alpha1TableTrigger trigger = trigger("ns", "logical-events-offline-trigger", + "uid-lt", "0 */6 * * *", + "hdfs-db_hdfs-db.events", // source (offline) + "venice-db_venice-db.events"); // sink (online) + + PipelineGraphBuilder builder = builder( + list(), + list(n2o, n2f), + list(lt), + list(trigger)); + + PipelineGraph graph = builder.forLogicalTable("events"); + + assertEquals(GraphNode.Kind.LOGICAL_TABLE, graph.root().kind()); + long pipelineCount = graph.nodes().stream() + .filter(n -> n.kind() == GraphNode.Kind.PIPELINE).count(); + assertEquals(2, pipelineCount, "two implicit pipelines expected"); + + long triggerCount = graph.nodes().stream() + .filter(n -> n.kind() == GraphNode.Kind.TRIGGER).count(); + assertEquals(1, triggerCount, "offline trigger expected"); + + long triggerEdges = graph.edges().stream() + .filter(e -> e.type() == GraphEdge.Type.TRIGGERS).count(); + assertEquals(2, triggerEdges, + "bridging trigger should produce two TRIGGERS edges: source→trigger and trigger→sink"); + + // LT now owns whatever externals match a tier database, plus the 2 pipelines + 1 trigger. + // Three tier-physical externals come from the pipelines (one per tier db) plus the 2 + // pipelines + 1 trigger = 6 OWNER_OF edges. + long ownerEdges = graph.edges().stream() + .filter(e -> e.type() == GraphEdge.Type.OWNER_OF + && e.from().equals(graph.root())).count(); + assertEquals(6, ownerEdges, + "LogicalTable should own its tier externals + 2 pipelines + 1 trigger; got: " + ownerEdges); + } + + // ─── Test: direction filter doesn't leak unrelated siblings ────────────── + + @Test + void forResourceDirectionFilterDoesNotLeakUnrelatedSiblings() throws SQLException { + // Three pipelines wired so a flipped direction check would visibly leak. From the user's + // reverse-lookup on B, only X (writes to B) and Y (reads from B) are part of B's neighborhood. + // Z reads from A — A is a transitive upstream of B (via X), but Z is X's sibling, not B's + // ancestor. The direction-aware traversal must not pull Z in. + // + // X: A --> B + // Y: B --> C + // Z: A --> D <-- must NOT appear in B's reverse lookup + V1alpha1Pipeline x = pipelineWithSourcesAndSink( + "x", "uid-x", "Job", + List.of("dbA_X.A"), + "dbB_Y.B"); + V1alpha1Pipeline y = pipelineWithSourcesAndSink( + "y", "uid-y", "Job", + List.of("dbB_Y.B"), + "dbC_Z.C"); + V1alpha1Pipeline z = pipelineWithSourcesAndSink( + "z", "uid-z", "Job", + List.of("dbA_X.A"), + "dbD_W.D"); + + PipelineGraphBuilder builder = builder(list(), list(x, y, z), list(), list()); + + PipelineGraph graph = builder.forResource("dbB", List.of("Y", "B"), 3); + + boolean hasX = graph.nodes().stream().anyMatch(n -> + n instanceof GraphNode.Pipeline && "x".equals(((GraphNode.Pipeline) n).name())); + boolean hasY = graph.nodes().stream().anyMatch(n -> + n instanceof GraphNode.Pipeline && "y".equals(((GraphNode.Pipeline) n).name())); + boolean hasZ = graph.nodes().stream().anyMatch(n -> + n instanceof GraphNode.Pipeline && "z".equals(((GraphNode.Pipeline) n).name())); + assertTrue(hasX, "X writes to B — must appear via upstream walk"); + assertTrue(hasY, "Y reads from B — must appear via downstream walk"); + assertFalse(hasZ, + "Z shares only an upstream-of-upstream resource with B — must NOT leak through the " + + "direction filter"); + } + + // ─── ownedBy edge cases ────────────────────────────────────────────────── + + @Test + void forLogicalTableIncludesPipelineViaOwnerNameFallback() throws SQLException { + // Etcd restore (or any operation that recreates the LogicalTable with a fresh UID) leaves + // owned Pipelines pointing at the prior UID. ownedBy()'s name fallback exists to keep the + // ownership relationship intact in that case — the visualizer must still draw the OWNER_OF + // edge so the graph doesn't go silently empty after a restore. + V1alpha1LogicalTable lt = logicalTable("ns", "events", "current-uid", linked( + "nearline", "kafka-db", + "online", "venice-db")); + + V1alpha1Pipeline pipeline = pipelineWithSourcesAndSink( + "logical-events-nearline-to-online", "stale-uid-from-prior-incarnation", "LogicalTable", + List.of("kafka-db_kafka-db.events"), + "venice-db_venice-db.events"); + + PipelineGraphBuilder builder = builder(list(), list(pipeline), list(lt), list()); + + PipelineGraph graph = builder.forLogicalTable("events"); + + long ownerEdges = graph.edges().stream() + .filter(e -> e.type() == GraphEdge.Type.OWNER_OF && e.from().equals(graph.root()) + && e.to() instanceof GraphNode.Pipeline) + .count(); + assertEquals(1, ownerEdges, + "name-fallback in ownedBy() must keep the OWNER_OF edge across a UID change"); + } + + @Test + void forLogicalTableSkipsPipelineWithWrongOwnerKind() throws SQLException { + // A pipeline coincidentally named like a LogicalTable's inter-tier pipeline but actually + // owned by a different kind (e.g. a Job CRD) must NOT be claimed as the LogicalTable's child. + // ownedBy()'s kind check is the guard. + V1alpha1LogicalTable lt = logicalTable("ns", "events", "lt-uid", linked( + "nearline", "kafka-db", + "online", "venice-db")); + + V1alpha1Pipeline impostor = pipelineWithSourcesAndSink( + "logical-events-nearline-to-online", "lt-uid", "Job", // kind=Job, not LogicalTable + List.of("kafka-db_kafka-db.events"), + "venice-db_venice-db.events"); + + PipelineGraphBuilder builder = builder(list(), list(impostor), list(lt), list()); + + PipelineGraph graph = builder.forLogicalTable("events"); + + long ownerEdges = graph.edges().stream() + .filter(e -> e.type() == GraphEdge.Type.OWNER_OF && e.to() instanceof GraphNode.Pipeline) + .count(); + assertEquals(0, ownerEdges, + "wrong-kind owner-ref must not be claimed as the LogicalTable's child"); + } + + // ─── Null-metadata defenses ────────────────────────────────────────────── + + @Test + void forLogicalTableHandlesMissingUidAndSpec() throws SQLException { + // CRD shapes pulled from K8s can have a partially-populated metadata block (e.g. missing + // UID before the apiserver assigns one) or a null spec mid-reconciliation. The visualizer + // must surface the LogicalTable root rather than NPE'ing on the missing fields. Owned + // children won't be discoverable (no UID for owner-ref check, no tableName for name + // deduction), so the graph is root-only, but it renders. + V1alpha1LogicalTable lt = new V1alpha1LogicalTable(); + lt.setMetadata(new V1ObjectMeta().namespace("ns").name("events")); // no UID + // spec left null + + PipelineGraphBuilder builder = builder(list(), list(), list(lt), list()); + + PipelineGraph graph = builder.forLogicalTable("events"); + + assertEquals(GraphNode.Kind.LOGICAL_TABLE, graph.root().kind()); + assertEquals(1, graph.nodes().size(), + "root-only graph expected when LogicalTable lacks UID/spec; got: " + graph.nodes()); + } + + @Test + void forLogicalTableSkipsTierWithNullDatabaseBinding() throws SQLException { + // A tier declared without a resolved database binding (e.g. partially-applied LogicalTable + // mid-reconciliation) must not propagate into the GraphNode.LogicalTable tier map as null — + // downstream renderers key on the database value and would silently malform the subgraph. + V1alpha1LogicalTable lt = new V1alpha1LogicalTable(); + V1ObjectMeta meta = new V1ObjectMeta().namespace("ns").name("events").uid("uid"); + lt.setMetadata(meta); + V1alpha1LogicalTableSpec spec = new V1alpha1LogicalTableSpec(); + spec.setTableName("events"); + Map tiers = new LinkedHashMap<>(); + V1alpha1LogicalTableSpecTiers nullBindingTier = new V1alpha1LogicalTableSpecTiers(); + // .database not set — null binding + tiers.put("nearline", nullBindingTier); + V1alpha1LogicalTableSpecTiers boundTier = new V1alpha1LogicalTableSpecTiers(); + boundTier.setDatabase("venice-db"); + tiers.put("online", boundTier); + spec.setTiers(tiers); + lt.setSpec(spec); + + PipelineGraphBuilder builder = builder(list(), list(), list(lt), list()); + + PipelineGraph graph = builder.forLogicalTable("events"); + + GraphNode.LogicalTable root = (GraphNode.LogicalTable) graph.root(); + assertFalse(root.tiers().containsKey("nearline"), + "tier with null database binding must be dropped, not propagated as null"); + assertEquals("venice-db", root.tiers().get("online"), + "bound tier should survive the filtering"); + } + + // ─── Test: trigger pointing at an arbitrary table (not LogicalTable-owned) ─ + + @Test + void forResourceTraversesIntoTriggerTarget() throws SQLException { + // A standalone trigger that fires a pipeline writing to an arbitrary external table. + // forResource on that table should still surface the connected pipeline; the trigger itself + // doesn't live in this graph (it's not owned by the resource), but the pipeline does. + V1alpha1Pipeline pipeline = pipelineWithSourcesAndSink( + "etl-pipeline", "uid-job", "Job", + List.of("source-db_S.input"), + "sink-db_S.output"); + + PipelineGraphBuilder builder = builder( + list(), list(pipeline), list(), list()); + + PipelineGraph graph = builder.forResource("sink-db", Arrays.asList("S", "output"), 1); + + long pipelineCount = graph.nodes().stream() + .filter(n -> n.kind() == GraphNode.Kind.PIPELINE).count(); + assertEquals(1, pipelineCount, "reverse lookup should surface the writer pipeline"); + + boolean hasSinkEdge = graph.edges().stream() + .anyMatch(e -> e.type() == GraphEdge.Type.DEPENDS_ON_SINK); + assertTrue(hasSinkEdge, "writer pipeline should connect to sink via DEPENDS_ON_SINK"); + } + + // ─── Test: depth=0 stops at the root only ───────────────────────────────── + + @Test + void depthZeroForResourceReturnsRootOnly() throws SQLException { + V1alpha1Pipeline pipeline = pipelineWithSourcesAndSink( + "p1", "uid", "Job", + List.of("kafka1_KAFKA.events"), + "ads_VENICE.audience"); + + PipelineGraphBuilder builder = builder( + list(), list(pipeline), list(), list()); + + PipelineGraph graph = builder.forResource("kafka1", Arrays.asList("KAFKA", "events"), 0); + + assertEquals(1, graph.nodes().size(), + "depth=0 should yield only the root resource"); + } + + // ─── Test: cycles in the pipeline graph terminate cleanly ──────────────── + + @Test + void forResourceTerminatesOnCycle() throws SQLException { + // Cycle: P1 reads T1 / writes T2; P2 reads T2 / writes T1. Same resource appears as both + // source and sink along the chain, which would loop forever without the Traversal's + // visited-set guard. Pin that the build still terminates and the rendered graph captures + // the loop as a closed cycle (4 nodes, 4 edges). + V1alpha1Pipeline p1 = pipelineWithSourcesAndSink( + "p1", "uid-job-1", "Job", + List.of("db_A.t1"), + "db_A.t2"); + V1alpha1Pipeline p2 = pipelineWithSourcesAndSink( + "p2", "uid-job-2", "Job", + List.of("db_A.t2"), + "db_A.t1"); + + PipelineGraphBuilder builder = builder( + list(), list(p1, p2), list(), list()); + + // Generous depth to make sure termination comes from the visited-set, not the cap. + PipelineGraph graph = builder.forResource("db", Arrays.asList("A", "t1"), 10); + + // Exactly 4 nodes: T1 (root), P1, T2, P2 — no duplicate node identities. + assertEquals(4, graph.nodes().size(), + "cycle should produce 4 distinct nodes (T1, P1, T2, P2), not infinite expansion"); + + // Exactly 4 edges in the loop: + // T1 → P1 (DEPENDS_ON_SOURCE) + // P1 → T2 (DEPENDS_ON_SINK) + // T2 → P2 (DEPENDS_ON_SOURCE) + // P2 → T1 (DEPENDS_ON_SINK) + assertEquals(4, graph.edges().size(), + "cycle should produce 4 distinct edges, no duplicates from re-expansion"); + + // The second arc closes the loop — P2's sink points back at the root. + boolean cycleCloses = graph.edges().stream() + .anyMatch(e -> e.type() == GraphEdge.Type.DEPENDS_ON_SINK + && e.to().equals(graph.root())); + assertTrue(cycleCloses, "an edge should point back at the root to close the cycle"); + } + + // ─── Test: large depth values are accepted ──────────────────────────────── + + @Test + void largeDepthHandledGracefully() throws SQLException { + V1alpha1Pipeline pipeline = pipelineWithSourcesAndSink( + "p1", "uid", "Job", + List.of("kafka1_KAFKA.events"), + "ads_VENICE.audience"); + + PipelineGraphBuilder builder = builder( + list(), list(pipeline), list(), list()); + + // Depth 999: in practice graphs don't get that deep, so recursion terminates anyway. + PipelineGraph graph = builder.forResource("kafka1", Arrays.asList("KAFKA", "events"), 999); + + assertNotNull(graph); + assertTrue(graph.nodes().size() >= 2, "large depth should still discover at least the pipeline"); + } + + // ─── Test: forView accepts SQL-style identifiers and canonicalizes them ── + + @Test + void forViewCanonicalizesSqlStyleIdentifier() throws SQLException { + // CRD is stored as `venice-test-store-insert-partial`. The user passes the SQL-side identifier + // they typed in their CREATE statement, which has uppercase, dots, and a `$`. + V1alpha1View view = view("ns", "venice-test-store-insert-partial", true, "uid-v"); + + PipelineGraphBuilder builder = builder( + list(view), list(), list(), list()); + + PipelineGraph graph = builder.forView("VENICE.test-store$insert-partial"); + + assertEquals(GraphNode.Kind.VIEW, graph.root().kind()); + assertEquals("venice-test-store-insert-partial", ((GraphNode.View) graph.root()).name(), + "root name should be the canonicalized form"); + } + + // ─── Test: extractJobKind matches only kinds ending in "Job" ───────────── + + @Test + void extractJobKindReturnsTheJobSuffixedKind() { + String yaml = "apiVersion: hoptimator.linkedin.com/v1alpha1\n" + + "kind: KafkaTopic\n" + + "metadata:\n name: foo\n" + + "---\n" + + "apiVersion: flink.apache.org/v1beta1\n" + + "kind: FlinkSessionJob\n" + + "metadata:\n name: foo-job\n"; + + assertEquals("FlinkSessionJob", PipelineGraphBuilder.extractJobKind(yaml), + "should pick the kind whose name ends in 'Job', not any sibling resource"); + } + + @Test + void extractJobKindReturnsNullWhenNoJobSuffixedKindPresent() { + String yaml = "kind: KafkaTopic\n---\nkind: VeniceStore\n---\nkind: ConfigMap\n"; + + assertNull(PipelineGraphBuilder.extractJobKind(yaml), + "no Job-suffixed kind → null (we don't fabricate a job)"); + } + + @Test + void inferEngineNullJobKindReturnsNull() { + assertNull(PipelineGraphBuilder.inferEngine(null, "yaml content")); + } + + @Test + void inferEngineSparkJobReturnsSpark() { + assertEquals("Spark", PipelineGraphBuilder.inferEngine("SparkApplication", null)); + } + + @Test + void inferEngineBeamOnFlinkReturnsFlinkBeam() { + // jobKind hints at Flink, yaml mentions Beam — engine is Flink Beam (Beam pipelines running + // on a Flink runner). + assertEquals("Flink Beam", PipelineGraphBuilder.inferEngine("FlinkSessionJob", + "containers:\n - name: beam-worker\n")); + } + + @Test + void inferEngineBeamOnlyJobReturnsBeam() { + // Beam without a Flink runner declared — fall through to plain Beam. + assertEquals("Beam", PipelineGraphBuilder.inferEngine("BeamApplication", null)); + } + + @Test + void inferEngineUnknownEngineReturnsNull() { + // No flink/beam/spark keyword anywhere — caller renders without an engine line. + assertNull(PipelineGraphBuilder.inferEngine("CustomKind", null)); + } + + @Test + void inferEngineFlinkOnlyKindReturnsFlink() { + // FlinkSessionJob with no Beam mention anywhere — pure Flink. Distinct from the beam-on-flink + // branch, which requires either the kind or the YAML to mention Beam. + assertEquals("Flink", PipelineGraphBuilder.inferEngine("FlinkSessionJob", + "spec:\n parallelism: 4\n")); + } + + @Test + void extractFirstContainerNameReturnsNullForNullOrEmpty() { + assertNull(PipelineGraphBuilder.extractFirstContainerName(null)); + assertNull(PipelineGraphBuilder.extractFirstContainerName("")); + } + + @Test + void extractFirstContainerNameReturnsNullWhenContainersKeyMissing() { + // Trigger YAML without a containers: section (e.g. a Job spec that uses a podTemplate ref). + String yaml = "apiVersion: batch/v1\nkind: Job\nmetadata:\n name: my-job\n"; + assertNull(PipelineGraphBuilder.extractFirstContainerName(yaml)); + } + + @Test + void extractFirstContainerNamePicksFirstEntry() { + // Two containers — return the first one. The trigger renderer surfaces this as the executable + // identity in the trigger label. + String yaml = "spec:\n containers:\n - name: main\n image: foo:bar\n - name: sidecar\n image: baz:qux\n"; + assertEquals("main", PipelineGraphBuilder.extractFirstContainerName(yaml)); + } + + @Test + void extractFirstContainerNameHandlesCompactIndentation() { + // YAML serializers vary on indent — 2-space vs 4-space under containers:. The regex must + // tolerate both rather than silently returning null. + String compact = "containers:\n- name: only\n image: x\n"; + assertEquals("only", PipelineGraphBuilder.extractFirstContainerName(compact)); + } + + @Test + void extractFirstContainerNameSkipsFieldsBeforeContainers() { + // Some other "name:" field appears earlier in the doc (e.g. metadata.name). Restricting the + // search to after the "containers:" marker is what prevents it from being picked up. + String yaml = "metadata:\n name: outer\nspec:\n containers:\n - name: inner\n"; + assertEquals("inner", PipelineGraphBuilder.extractFirstContainerName(yaml)); + } + + @Test + void extractJobKindHandlesPlainBatchJob() { + String yaml = "apiVersion: batch/v1\n" + + "kind: Job\n" + + "metadata:\n name: cron-job\n"; + + assertEquals("Job", PipelineGraphBuilder.extractJobKind(yaml)); + } + + @Test + void extractJobTemplateNameStripsTriggerPrefix() { + // Deployer renders `metadata.name: -