From 70411496b7a2439dcdaac1cf76607452cc95d88b Mon Sep 17 00:00:00 2001 From: Joseph Grogan Date: Tue, 19 May 2026 17:19:39 -0400 Subject: [PATCH 1/7] Add pipeline graph SPI + Mermaid renderer Introduces the visualization framework consumed by the `!graph` command in a follow-up commit: - `com.linkedin.hoptimator.graph` (in hoptimator-api): GraphNode, GraphEdge, GraphTarget, PipelineGraph data model + GraphProvider and GraphRenderer SPIs. - New `hoptimator-graph` module hosting the Mermaid renderer plus CronHumanizer (renders cron expressions in trigger labels as English via cron-utils CronDescriptor, falling through to the raw cron string on parse failure). The renderer is registered via META-INF/services and discovered by GraphService (added in the next commit). --- .../linkedin/hoptimator/graph/GraphEdge.java | 64 ++++ .../linkedin/hoptimator/graph/GraphNode.java | 226 ++++++++++++++ .../hoptimator/graph/GraphProvider.java | 22 ++ .../hoptimator/graph/GraphRenderer.java | 17 ++ .../hoptimator/graph/GraphTarget.java | 88 ++++++ .../hoptimator/graph/PipelineGraph.java | 44 +++ .../hoptimator/graph/GraphEdgeTest.java | 74 +++++ hoptimator-graph/build.gradle | 54 ++++ .../graph/mermaid/CronHumanizer.java | 64 ++++ .../graph/mermaid/MermaidRenderer.java | 282 ++++++++++++++++++ ...om.linkedin.hoptimator.graph.GraphRenderer | 1 + .../graph/mermaid/CronHumanizerTest.java | 77 +++++ .../graph/mermaid/MermaidRendererTest.java | 241 +++++++++++++++ settings.gradle | 1 + 14 files changed, 1255 insertions(+) create mode 100644 hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphEdge.java create mode 100644 hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphNode.java create mode 100644 hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphProvider.java create mode 100644 hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphRenderer.java create mode 100644 hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphTarget.java create mode 100644 hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/PipelineGraph.java create mode 100644 hoptimator-api/src/test/java/com/linkedin/hoptimator/graph/GraphEdgeTest.java create mode 100644 hoptimator-graph/build.gradle create mode 100644 hoptimator-graph/src/main/java/com/linkedin/hoptimator/graph/mermaid/CronHumanizer.java create mode 100644 hoptimator-graph/src/main/java/com/linkedin/hoptimator/graph/mermaid/MermaidRenderer.java create mode 100644 hoptimator-graph/src/main/resources/META-INF/services/com.linkedin.hoptimator.graph.GraphRenderer create mode 100644 hoptimator-graph/src/test/java/com/linkedin/hoptimator/graph/mermaid/CronHumanizerTest.java create mode 100644 hoptimator-graph/src/test/java/com/linkedin/hoptimator/graph/mermaid/MermaidRendererTest.java diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphEdge.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphEdge.java new file mode 100644 index 00000000..78544374 --- /dev/null +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphEdge.java @@ -0,0 +1,64 @@ +package com.linkedin.hoptimator.graph; + +import java.util.Objects; + + +/** + * A directed edge in a pipeline visualization graph. Edges are equal when their endpoints and + * type match — two edges of different types between the same nodes are distinct (e.g. an + * {@code ownerOf} relationship coexists with a {@code dependsOnSink} relationship). + */ +public final class GraphEdge { + + public enum Type { + /** {@code metadata.ownerReferences} cascade — drives subgraph membership, not arrows. */ + OWNER_OF, + /** Resource → pipeline (or job) edge derived from the {@code depends-on} annotation. */ + DEPENDS_ON_SOURCE, + /** Pipeline (or job) → resource edge derived from the {@code depends-on} annotation. */ + DEPENDS_ON_SINK, + /** Trigger → job/pipeline; rendered as a dotted line. */ + TRIGGERS + } + + private final GraphNode from; + private final GraphNode to; + private final Type type; + + public GraphEdge(GraphNode from, GraphNode to, Type type) { + this.from = Objects.requireNonNull(from, "from"); + this.to = Objects.requireNonNull(to, "to"); + this.type = Objects.requireNonNull(type, "type"); + } + + public GraphNode from() { + return from; + } + + public GraphNode to() { + return to; + } + + public Type type() { + return type; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof GraphEdge)) { + return false; + } + GraphEdge other = (GraphEdge) o; + return type == other.type && from.equals(other.from) && to.equals(other.to); + } + + @Override + public int hashCode() { + return Objects.hash(from, to, type); + } + + @Override + public String toString() { + return from.id() + " --" + type + "--> " + to.id(); + } +} diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphNode.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphNode.java new file mode 100644 index 00000000..7de49cb8 --- /dev/null +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphNode.java @@ -0,0 +1,226 @@ +package com.linkedin.hoptimator.graph; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Map; +import java.util.Objects; + + +/** + * A node in a pipeline visualization graph. + * + *

Concrete node types are nested final subclasses keyed off a {@link Kind} enum so renderers + * can dispatch on kind without {@code instanceof} chains. Node identity for set/edge purposes is + * determined by {@link #id()} — typically a stable string built from the resource's name, or + * for {@link External} nodes from the database + path identifier. + */ +public abstract class GraphNode { + + public enum Kind { + PIPELINE, + VIEW, + LOGICAL_TABLE, + TRIGGER, + EXTERNAL + } + + private final Kind kind; + private final String id; + + private GraphNode(Kind kind, String id) { + this.kind = Objects.requireNonNull(kind, "kind"); + this.id = Objects.requireNonNull(id, "id"); + } + + public final Kind kind() { + return kind; + } + + public final String id() { + return id; + } + + /** Human-readable label used by renderers as the primary node text. */ + public abstract String displayName(); + + @Override + public final boolean equals(Object o) { + return o instanceof GraphNode && id.equals(((GraphNode) o).id); + } + + @Override + public final int hashCode() { + return id.hashCode(); + } + + @Override + public String toString() { + return kind + "[" + id + "]"; + } + + /** + * A deployed pipeline — the job that materializes a query from sources to a sink. Carries + * optional {@code jobKind} and {@code engine} descriptors extracted from the underlying job + * artifact and surfaced inline in the rendered label so users can tell at a glance whether a + * pipeline runs e.g. on Flink vs a Beam runner. + */ + public static final class Pipeline extends GraphNode { + private final String name; + private final String jobKind; + private final String engine; + + public Pipeline(String name, @Nullable String jobKind, @Nullable String engine) { + super(Kind.PIPELINE, "pipeline:" + name); + this.name = Objects.requireNonNull(name, "name"); + this.jobKind = jobKind; + this.engine = engine; + } + + public String name() { + return name; + } + + /** Type of the underlying job artifact (e.g. {@code FlinkDeployment}). */ + public @Nullable String jobKind() { + return jobKind; + } + + /** Execution engine inferred from the job artifact (e.g. {@code Flink}, {@code Flink Beam}). */ + public @Nullable String engine() { + return engine; + } + + @Override + public String displayName() { + return name; + } + } + + /** A view — regular or materialized. */ + public static final class View extends GraphNode { + private final String name; + private final boolean materialized; + + public View(String name, boolean materialized) { + super(Kind.VIEW, "view:" + name); + this.name = Objects.requireNonNull(name, "name"); + this.materialized = materialized; + } + + public String name() { + return name; + } + + public boolean materialized() { + return materialized; + } + + @Override + public String displayName() { + return name; + } + } + + /** A logical table. Tier names preserve insertion order. */ + public static final class LogicalTable extends GraphNode { + private final String name; + private final Map tiers; + + public LogicalTable(String name, Map tiers) { + super(Kind.LOGICAL_TABLE, "logicaltable:" + name); + this.name = Objects.requireNonNull(name, "name"); + this.tiers = Objects.requireNonNull(tiers, "tiers"); + } + + public String name() { + return name; + } + + /** Tier name → backing database identifier (e.g. {@code nearline → kafka-db}). */ + public Map tiers() { + return tiers; + } + + @Override + public String displayName() { + return name; + } + } + + /** + * A trigger — a scheduled or event-driven job that fires against its target. Carries optional + * rendering hints (container name, job-template binding) pulled from the trigger's underlying + * job declaration. Renderers display whichever hints are present so the user can tell at a + * glance what the underlying job is doing. + */ + public static final class Trigger extends GraphNode { + private final String name; + private final String schedule; + private final boolean paused; + private final String jobTemplateName; + private final String containerName; + + public Trigger(String name, @Nullable String schedule, boolean paused, + @Nullable String jobTemplateName, @Nullable String containerName) { + super(Kind.TRIGGER, "trigger:" + name); + this.name = Objects.requireNonNull(name, "name"); + this.schedule = schedule; + this.paused = paused; + this.jobTemplateName = jobTemplateName; + this.containerName = containerName; + } + + public String name() { + return name; + } + + /** Cron expression; null if the trigger isn't schedule-driven. */ + public @Nullable String schedule() { + return schedule; + } + + public boolean paused() { + return paused; + } + + /** Job-template name this trigger was rendered from (e.g. {@code retl-job-template}); null when unknown. */ + public @Nullable String jobTemplateName() { + return jobTemplateName; + } + + /** First container's name in the rendered job spec. */ + public @Nullable String containerName() { + return containerName; + } + + @Override + public String displayName() { + return name; + } + } + + /** A resource managed outside Hoptimator (Kafka topic, Venice store, MySQL table, etc.). */ + public static final class External extends GraphNode { + private final String database; + private final List path; + + public External(String database, List path) { + super(Kind.EXTERNAL, "external:" + database + "/" + String.join(".", path)); + this.database = Objects.requireNonNull(database, "database"); + this.path = Objects.requireNonNull(path, "path"); + } + + public String database() { + return database; + } + + public List path() { + return path; + } + + @Override + public String displayName() { + return String.join(".", path); + } + } +} diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphProvider.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphProvider.java new file mode 100644 index 00000000..9452d824 --- /dev/null +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphProvider.java @@ -0,0 +1,22 @@ +package com.linkedin.hoptimator.graph; + +import java.sql.Connection; +import java.sql.SQLException; + + +/** + * SPI for backends that can materialize a {@link PipelineGraph} for a given {@link GraphTarget}. + * Discovered at runtime via {@code ServiceLoader} — implementations register themselves through + * a {@code META-INF/services/com.linkedin.hoptimator.graph.GraphProvider} file in their module. + */ +public interface GraphProvider { + + /** + * Build a graph for the given target. Should only be called when {@link #supports(GraphTarget)} + * returns {@code true} for the same target. + */ + PipelineGraph forTarget(GraphTarget target, int depth, Connection connection) throws SQLException; + + /** Whether this provider can handle the given target kind. */ + boolean supports(GraphTarget target); +} diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphRenderer.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphRenderer.java new file mode 100644 index 00000000..bb1ab90a --- /dev/null +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphRenderer.java @@ -0,0 +1,17 @@ +package com.linkedin.hoptimator.graph; + +/** + * SPI for serializing a {@link PipelineGraph} to a string format (Mermaid, DOT, JSON, etc.). + * Discovered at runtime via {@code ServiceLoader}. + * + *

Each renderer declares its {@link #format()} (e.g. {@code "mermaid"}); the + * {@code GraphService.render(graph, format)} dispatcher picks the matching renderer. + */ +public interface GraphRenderer { + + /** Serialize the graph to a string in this renderer's format. */ + String render(PipelineGraph graph); + + /** Format identifier (e.g. {@code "mermaid"}, {@code "dot"}, {@code "json"}). */ + String format(); +} diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphTarget.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphTarget.java new file mode 100644 index 00000000..d092109b --- /dev/null +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/GraphTarget.java @@ -0,0 +1,88 @@ +package com.linkedin.hoptimator.graph; + +import java.util.List; +import java.util.Objects; + + +/** + * The thing a {@link GraphProvider} is asked to materialize a {@link PipelineGraph} around. + * + *

Abstract base + nested final subclasses, one per CLI mode: + *

+ * + *

Providers dispatch on the concrete subclass via {@code instanceof} to route to the right + * internal implementation. Same pattern {@link GraphNode} uses for its kinds. + */ +public abstract class GraphTarget { + + private GraphTarget() { + } + + /** A regular or materialized view, identified by its canonical name. */ + public static final class View extends GraphTarget { + private final String name; + + public View(String name) { + this.name = Objects.requireNonNull(name, "name"); + } + + public String name() { + return name; + } + + @Override + public String toString() { + return "View[" + name + "]"; + } + } + + /** A logical table identified by its canonical name. */ + public static final class LogicalTable extends GraphTarget { + private final String name; + + public LogicalTable(String name) { + this.name = Objects.requireNonNull(name, "name"); + } + + public String name() { + return name; + } + + @Override + public String toString() { + return "LogicalTable[" + name + "]"; + } + } + + /** + * An external resource (Kafka topic, Venice store, MySQL table, etc.) identified by its + * database name plus path components. Used for reverse lookups. + */ + public static final class Resource extends GraphTarget { + private final String database; + private final List path; + + public Resource(String database, List path) { + this.database = Objects.requireNonNull(database, "database"); + this.path = Objects.requireNonNull(path, "path"); + } + + public String database() { + return database; + } + + public List path() { + return path; + } + + @Override + public String toString() { + return "Resource[" + database + "/" + String.join(".", path) + "]"; + } + } +} diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/PipelineGraph.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/PipelineGraph.java new file mode 100644 index 00000000..5ed2a067 --- /dev/null +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/graph/PipelineGraph.java @@ -0,0 +1,44 @@ +package com.linkedin.hoptimator.graph; + +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.Objects; +import java.util.Set; + + +/** + * A pipeline visualization graph: a {@link #root()} node the user asked about plus the surrounding + * nodes and edges discovered during traversal. + * + *

The graph is intentionally minimal — discovery lives in backend-specific + * {@link GraphProvider} implementations and rendering lives in {@link GraphRenderer} + * implementations. This POJO is the wire format between them. + */ +public final class PipelineGraph { + + private final GraphNode root; + private final Set nodes; + private final Set edges; + + public PipelineGraph(GraphNode root, Set nodes, Set edges) { + this.root = Objects.requireNonNull(root, "root"); + this.nodes = Collections.unmodifiableSet(new LinkedHashSet<>(Objects.requireNonNull(nodes, "nodes"))); + this.edges = Collections.unmodifiableSet(new LinkedHashSet<>(Objects.requireNonNull(edges, "edges"))); + if (!this.nodes.contains(root)) { + throw new IllegalArgumentException("root node " + root.id() + " is not in the node set"); + } + } + + /** The node the user asked the graph to be built around (the entity highlighted by renderers). */ + public GraphNode root() { + return root; + } + + public Set nodes() { + return nodes; + } + + public Set edges() { + return edges; + } +} diff --git a/hoptimator-api/src/test/java/com/linkedin/hoptimator/graph/GraphEdgeTest.java b/hoptimator-api/src/test/java/com/linkedin/hoptimator/graph/GraphEdgeTest.java new file mode 100644 index 00000000..16195cf0 --- /dev/null +++ b/hoptimator-api/src/test/java/com/linkedin/hoptimator/graph/GraphEdgeTest.java @@ -0,0 +1,74 @@ +package com.linkedin.hoptimator.graph; + +import java.util.Arrays; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +/** + * Equality semantics for {@link GraphEdge}. The graph stores edges in a {@code Set}; if two + * edges with the same endpoints + type aren't equal, we'd end up with duplicate arrows in + * the rendered output. + */ +class GraphEdgeTest { + + private static final GraphNode A = new GraphNode.External("db", Arrays.asList("a")); + private static final GraphNode B = new GraphNode.External("db", Arrays.asList("b")); + + @Test + void equalEdgesShareHashCode() { + GraphEdge e1 = new GraphEdge(A, B, GraphEdge.Type.DEPENDS_ON_SOURCE); + GraphEdge e2 = new GraphEdge(A, B, GraphEdge.Type.DEPENDS_ON_SOURCE); + + assertEquals(e1, e2); + assertEquals(e1.hashCode(), e2.hashCode()); + } + + @Test + void differentTypeMakesEdgesUnequal() { + GraphEdge source = new GraphEdge(A, B, GraphEdge.Type.DEPENDS_ON_SOURCE); + GraphEdge owner = new GraphEdge(A, B, GraphEdge.Type.OWNER_OF); + assertNotEquals(source, owner, + "an OWNER_OF and a DEPENDS_ON_SOURCE between the same nodes should coexist"); + } + + @Test + void differentEndpointsMakeEdgesUnequal() { + GraphEdge ab = new GraphEdge(A, B, GraphEdge.Type.TRIGGERS); + GraphEdge ba = new GraphEdge(B, A, GraphEdge.Type.TRIGGERS); + assertNotEquals(ab, ba, "edges are directed; A→B is distinct from B→A"); + } + + @Test + void notEqualToOtherTypes() { + GraphEdge e = new GraphEdge(A, B, GraphEdge.Type.TRIGGERS); + assertNotEquals(e, "not an edge"); + assertNotEquals(e, null); + } + + @Test + void constructorRejectsNullArguments() { + assertThrows(NullPointerException.class, + () -> new GraphEdge(null, B, GraphEdge.Type.TRIGGERS)); + assertThrows(NullPointerException.class, + () -> new GraphEdge(A, null, GraphEdge.Type.TRIGGERS)); + assertThrows(NullPointerException.class, + () -> new GraphEdge(A, B, null)); + } + + @Test + void toStringMentionsBothEndpointsAndType() { + GraphEdge e = new GraphEdge(A, B, GraphEdge.Type.TRIGGERS); + String s = e.toString(); + assertNotNull(s); + assertTrue(s.contains(A.id()), "toString should include from id: " + s); + assertTrue(s.contains(B.id()), "toString should include to id: " + s); + assertTrue(s.contains("TRIGGERS"), "toString should include type: " + s); + } +} diff --git a/hoptimator-graph/build.gradle b/hoptimator-graph/build.gradle new file mode 100644 index 00000000..a81af0e7 --- /dev/null +++ b/hoptimator-graph/build.gradle @@ -0,0 +1,54 @@ +plugins { + id 'java' + id 'maven-publish' +} + +dependencies { + implementation project(':hoptimator-api') + implementation libs.cron.utils +} + +publishing { + repositories { + maven { + name 'GitHubPackages' + url = 'https://maven.pkg.github.com/linkedin/Hoptimator' + credentials { + username = System.getenv('GITHUB_ACTOR') + password = System.getenv('GITHUB_TOKEN') + } + } + maven { + name 'LinkedInJFrog' + url 'https://linkedin.jfrog.io/artifactory/hoptimator' + credentials { + username = System.getenv('JFROG_USERNAME') + password = System.getenv('JFROG_API_KEY') + } + } + } + publications { + maven(MavenPublication) { + groupId = 'com.linkedin.hoptimator' + artifactId = 'hoptimator-graph' + version = System.getenv('VERSION') + from components.java + pom { + name = 'hoptimator-graph' + description = 'Pipeline graph rendering — SPI-driven providers and renderers' + url = 'https://github.com/linkedin/Hoptimator' + licenses { + license { + name = 'BSD 2-Clause' + url = 'https://raw.githubusercontent.com/linkedin/Hoptimator/main/LICENSE' + } + } + scm { + connection = 'scm:git:git://github.com:linkedin/Hoptimator.git' + developerConnection = 'scm:git:ssh://github.com:linkedin/Hoptimator.git' + url = 'https://github.com/linkedin/Hoptimator' + } + } + } + } +} diff --git a/hoptimator-graph/src/main/java/com/linkedin/hoptimator/graph/mermaid/CronHumanizer.java b/hoptimator-graph/src/main/java/com/linkedin/hoptimator/graph/mermaid/CronHumanizer.java new file mode 100644 index 00000000..de5955ef --- /dev/null +++ b/hoptimator-graph/src/main/java/com/linkedin/hoptimator/graph/mermaid/CronHumanizer.java @@ -0,0 +1,64 @@ +package com.linkedin.hoptimator.graph.mermaid; + +import java.util.Locale; + +import com.cronutils.descriptor.CronDescriptor; +import com.cronutils.model.definition.CronDefinition; +import com.cronutils.model.definition.CronDefinitionBuilder; +import com.cronutils.parser.CronParser; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Renders a cron expression as English prose for the Mermaid Trigger node label. + * + *

Delegates to {@code cron-utils}'s {@link CronDescriptor}, which already understands the + * full grammar (ranges, lists, step values, nicknames). Anything {@link CronParser} rejects — + * malformed input, foreign dialects we don't model — falls back to the raw expression so the + * user still sees something useful instead of a wrong English phrase. + * + *

The cron grammar matches the one {@code TableTriggerReconciler} uses at runtime, so + * what the visualizer accepts and what the operator accepts don't drift. + */ +final class CronHumanizer { + + private static final Logger log = LoggerFactory.getLogger(CronHumanizer.class); + + private static final CronDefinition CRON_DEFINITION = CronDefinitionBuilder.defineCron() + .withMinutes().withValidRange(0, 59).withStrictRange().and() + .withHours().withValidRange(0, 23).withStrictRange().and() + .withDayOfMonth().withValidRange(1, 31).withStrictRange().and() + .withMonth().withValidRange(1, 12).withStrictRange().and() + .withDayOfWeek().withValidRange(0, 7).withMondayDoWValue(1).withIntMapping(7, 0).withStrictRange().and() + .withSupportedNicknameHourly() + .withSupportedNicknameDaily() + .withSupportedNicknameWeekly() + .withSupportedNicknameMonthly() + .withSupportedNicknameYearly() + .withSupportedNicknameAnnually() + .withSupportedNicknameMidnight() + .instance(); + + private static final CronParser PARSER = new CronParser(CRON_DEFINITION); + private static final CronDescriptor DESCRIPTOR = CronDescriptor.instance(Locale.US); + + private CronHumanizer() { + } + + /** Render a cron expression as English prose, or return the input unchanged when parsing fails. */ + static String humanize(String cron) { + if (cron == null || cron.isEmpty()) { + return cron; + } + try { + return DESCRIPTOR.describe(PARSER.parse(cron.trim())); + } catch (RuntimeException e) { + // CronParser throws IllegalArgumentException for unrecognized expressions (Quartz 6-field + // forms, vendor-specific syntax, etc.). Fall back to the raw form rather than swallow the + // user's intent behind a wrong phrase. + log.warn("Unable to parse cron expression={}", cron, e); + return cron; + } + } +} diff --git a/hoptimator-graph/src/main/java/com/linkedin/hoptimator/graph/mermaid/MermaidRenderer.java b/hoptimator-graph/src/main/java/com/linkedin/hoptimator/graph/mermaid/MermaidRenderer.java new file mode 100644 index 00000000..9e0c3a2b --- /dev/null +++ b/hoptimator-graph/src/main/java/com/linkedin/hoptimator/graph/mermaid/MermaidRenderer.java @@ -0,0 +1,282 @@ +package com.linkedin.hoptimator.graph.mermaid; + +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; + +import com.linkedin.hoptimator.graph.GraphEdge; +import com.linkedin.hoptimator.graph.GraphNode; +import com.linkedin.hoptimator.graph.GraphRenderer; +import com.linkedin.hoptimator.graph.PipelineGraph; + + +/** + * {@link GraphRenderer} that serializes a {@link PipelineGraph} as a Mermaid {@code flowchart} + * string. Registered via {@code META-INF/services/com.linkedin.hoptimator.graph.GraphRenderer} so it's + * discovered through {@link com.linkedin.hoptimator.jdbc.GraphService}. + * + *

Visual encoding: + *

+ * + *

Edges: + *

+ * + *

Orientation: {@code TD} for LogicalTable graphs (with {@code direction LR} inside the LT + * subgraph so inter-tier flows still read left-to-right); {@code LR} otherwise. + */ +public final class MermaidRenderer implements GraphRenderer { + + public static final String FORMAT = "mermaid"; + + @Override + public String format() { + return FORMAT; + } + + @Override + public String render(PipelineGraph graph) { + StringBuilder sb = new StringBuilder(); + sb.append("flowchart ").append(orientation(graph)).append("\n"); + + // Stable mermaid IDs (n0, n1, ...) keyed off node identity. + Map mermaidIds = assignIds(graph); + + // OWNER_OF edges drive subgraph membership; collect them so we can group children inside + // their owner. + Map> ownedChildren = new LinkedHashMap<>(); + Set ownedNodes = new LinkedHashSet<>(); + for (GraphEdge e : graph.edges()) { + if (e.type() == GraphEdge.Type.OWNER_OF) { + ownedChildren.computeIfAbsent(e.from(), k -> new LinkedHashSet<>()).add(e.to()); + ownedNodes.add(e.to()); + } + } + + // Render nodes — owners get a subgraph wrapper, owned children render inside it, free + // nodes render at the top level. + Set rendered = new LinkedHashSet<>(); + for (GraphNode node : graph.nodes()) { + if (rendered.contains(node) || ownedNodes.contains(node)) { + continue; + } + if (node.kind() == GraphNode.Kind.LOGICAL_TABLE) { + renderLogicalTableSubgraph(sb, (GraphNode.LogicalTable) node, ownedChildren, mermaidIds, rendered, " "); + } else if (ownedChildren.containsKey(node)) { + renderOwnerSubgraph(sb, node, ownedChildren, mermaidIds, rendered, " "); + } else { + sb.append(" ").append(renderNode(node, mermaidIds)).append("\n"); + rendered.add(node); + } + } + + // Render arrows (everything except OWNER_OF). + for (GraphEdge e : graph.edges()) { + if (e.type() == GraphEdge.Type.OWNER_OF) { + continue; + } + sb.append(" ").append(mermaidIds.get(e.from())) + .append(arrow(e.type())) + .append(mermaidIds.get(e.to())).append("\n"); + } + return sb.toString(); + } + + // ─── Subgraph rendering ────────────────────────────────────────────────── + + private static void renderLogicalTableSubgraph(StringBuilder sb, GraphNode.LogicalTable lt, + Map> ownedChildren, Map ids, + Set rendered, String indent) { + String ltId = ids.get(lt); + // LT wrapper carries both kind and name — none of the inner tier subgraphs or owned + // children show the LT name, so it has to live here. (Contrast with View wrappers: the + // inner pipeline node duplicates the View name, so the wrapper drops the name.) + sb.append(indent).append("subgraph ").append(ltId).append("[\"LogicalTable ") + .append(escape(lt.displayName())).append("\"]\n"); + sb.append(indent).append(" direction LR\n"); + rendered.add(lt); + + // Group LogicalTable's owned children into tier subgraphs based on the tier-database match. + Map tiers = lt.tiers(); + Map> nodesByTier = new LinkedHashMap<>(); + Set nonTierChildren = new LinkedHashSet<>(); + Set children = ownedChildren.getOrDefault(lt, new LinkedHashSet<>()); + + for (GraphNode child : children) { + String tier = tierFor(child, tiers); + if (tier != null) { + nodesByTier.computeIfAbsent(tier, k -> new LinkedHashSet<>()).add(child); + } else { + nonTierChildren.add(child); + } + } + + // Emit a subgraph per tier, in the order the LogicalTable declares them. + for (String tier : tiers.keySet()) { + Set tierNodes = nodesByTier.get(tier); + if (tierNodes == null) { + continue; + } + sb.append(indent).append(" subgraph ").append(safeId(tier)) + .append("[\"").append(escape(tier)).append("\"]\n"); + for (GraphNode n : tierNodes) { + sb.append(indent).append(" ").append(renderNode(n, ids)).append("\n"); + rendered.add(n); + } + sb.append(indent).append(" end\n"); + } + + // Anything else owned by the LogicalTable (Pipelines, Triggers) — flat inside the wrapper. + for (GraphNode n : nonTierChildren) { + sb.append(indent).append(" ").append(renderNode(n, ids)).append("\n"); + rendered.add(n); + } + + sb.append(indent).append("end\n"); + } + + private static void renderOwnerSubgraph(StringBuilder sb, GraphNode owner, + Map> ownedChildren, Map ids, + Set rendered, String indent) { + // Mermaid rejects "Setting n0 as parent of n0" if a node id collides with the subgraph id. + // The owner is the subgraph (its display name is the subgraph title), so we don't emit it + // as a separate node — same convention LogicalTable already uses. + String ownerId = ids.get(owner); + sb.append(indent).append("subgraph ").append(ownerId).append("[\"") + .append(escape(subgraphTitle(owner))).append("\"]\n"); + rendered.add(owner); + for (GraphNode child : ownedChildren.get(owner)) { + sb.append(indent).append(" ").append(renderNode(child, ids)).append("\n"); + rendered.add(child); + } + sb.append(indent).append("end\n"); + } + + /** + * Subgraph title for an owner. Strips the resource name from {@link GraphNode.View}'s + * display string ("Materialized View foo" → "Materialized View") since the owned pipeline + * node already shows the name inside the subgraph — repeating it on the wrapper is noise. + * Other node kinds fall through to {@link GraphNode#displayName()} unchanged. + */ + private static String subgraphTitle(GraphNode owner) { + if (owner instanceof GraphNode.View) { + return ((GraphNode.View) owner).materialized() ? "Materialized View" : "View"; + } + return owner.displayName(); + } + + private static String tierFor(GraphNode child, Map tiers) { + if (!(child instanceof GraphNode.External)) { + return null; + } + String db = ((GraphNode.External) child).database(); + for (Map.Entry e : tiers.entrySet()) { + if (e.getValue().equals(db)) { + return e.getKey(); + } + } + return null; + } + + // ─── Per-node mermaid syntax ───────────────────────────────────────────── + + private static String renderNode(GraphNode node, Map ids) { + String id = ids.get(node); + switch (node.kind()) { + case EXTERNAL: { + GraphNode.External ext = (GraphNode.External) node; + return id + "[(\"" + ext.displayName() + "\")]"; + } + case PIPELINE: { + GraphNode.Pipeline p = (GraphNode.Pipeline) node; + StringBuilder lbl = new StringBuilder(p.displayName()); + if (p.jobKind() != null) { + lbl.append("
kind: ").append(p.jobKind()); + } + if (p.engine() != null) { + lbl.append("
engine: ").append(p.engine()); + } + return id + "[/\"" + escape(lbl.toString()) + "\"/]"; + } + case VIEW: { + return id + "[\"" + escape(node.displayName()) + "\"]"; + } + case TRIGGER: { + GraphNode.Trigger t = (GraphNode.Trigger) node; + StringBuilder lbl = new StringBuilder(t.displayName()); + if (t.schedule() != null) { + lbl.append("
cron: ").append(CronHumanizer.humanize(t.schedule())); + } + if (t.jobTemplateName() != null) { + lbl.append("
template: ").append(t.jobTemplateName()); + } + if (t.containerName() != null) { + lbl.append("
container: ").append(t.containerName()); + } + if (t.paused()) { + lbl.append("
(paused)"); + } + return id + "{\"" + escape(lbl.toString()) + "\"}"; + } + case LOGICAL_TABLE: + // LogicalTable always renders as a subgraph wrapper, never as a flat node. + return id + "[\"" + escape(node.displayName()) + "\"]"; + default: + return id + "[\"" + escape(node.displayName()) + "\"]"; + } + } + + // ─── Edge syntax ───────────────────────────────────────────────────────── + + private static String arrow(GraphEdge.Type type) { + switch (type) { + case TRIGGERS: + return " -.-> "; + case DEPENDS_ON_SOURCE: + case DEPENDS_ON_SINK: + default: + return " --> "; + } + } + + // ─── Helpers ───────────────────────────────────────────────────────────── + + private static String orientation(PipelineGraph graph) { + return graph.root().kind() == GraphNode.Kind.LOGICAL_TABLE ? "TD" : "LR"; + } + + private static Map assignIds(PipelineGraph graph) { + Map ids = new HashMap<>(); + int i = 0; + for (GraphNode n : graph.nodes()) { + ids.put(n, "n" + i++); + } + return ids; + } + + private static String safeId(String raw) { + StringBuilder sb = new StringBuilder("s_"); + for (char c : raw.toCharArray()) { + sb.append(Character.isLetterOrDigit(c) ? c : '_'); + } + return sb.toString(); + } + + private static String escape(String s) { + if (s == null) { + return ""; + } + return s.replace("\"", """); + } +} diff --git a/hoptimator-graph/src/main/resources/META-INF/services/com.linkedin.hoptimator.graph.GraphRenderer b/hoptimator-graph/src/main/resources/META-INF/services/com.linkedin.hoptimator.graph.GraphRenderer new file mode 100644 index 00000000..43c171e3 --- /dev/null +++ b/hoptimator-graph/src/main/resources/META-INF/services/com.linkedin.hoptimator.graph.GraphRenderer @@ -0,0 +1 @@ +com.linkedin.hoptimator.graph.mermaid.MermaidRenderer diff --git a/hoptimator-graph/src/test/java/com/linkedin/hoptimator/graph/mermaid/CronHumanizerTest.java b/hoptimator-graph/src/test/java/com/linkedin/hoptimator/graph/mermaid/CronHumanizerTest.java new file mode 100644 index 00000000..d4c9ffe5 --- /dev/null +++ b/hoptimator-graph/src/test/java/com/linkedin/hoptimator/graph/mermaid/CronHumanizerTest.java @@ -0,0 +1,77 @@ +package com.linkedin.hoptimator.graph.mermaid; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +/** + * The bulk of the cron-grammar work happens inside {@code cron-utils}, so we don't re-test that + * library here. These cases pin the wrapper contract: standard 5-field expressions get translated + * to prose, and anything the parser rejects (Quartz 6-field, malformed input, null/empty) is + * returned unchanged so the user always sees something useful in the trigger label. + * + *

Substring assertions intentionally — the exact phrasing belongs to cron-utils and shouldn't + * lock us into a specific library version. + */ +class CronHumanizerTest { + + @Test + void everyMinute() { + String out = CronHumanizer.humanize("* * * * *"); + assertTrue(out.toLowerCase().contains("minute"), + "expected a humanized phrase mentioning 'minute', got: " + out); + } + + @Test + void everyNMinutes() { + String out = CronHumanizer.humanize("*/5 * * * *"); + assertTrue(out.contains("5") && out.toLowerCase().contains("minute"), + "expected '5' and 'minute' in: " + out); + } + + @Test + void everyNHours() { + String out = CronHumanizer.humanize("0 */6 * * *"); + assertTrue(out.contains("6") && out.toLowerCase().contains("hour"), + "expected '6' and 'hour' in: " + out); + } + + @Test + void daily() { + String out = CronHumanizer.humanize("0 0 * * *"); + // cron-utils renders this as "at 00:00" — no day-of-week qualifier when DOW is wildcard. + assertTrue(out.contains("00:00"), "expected '00:00' in: " + out); + } + + @Test + void weekly() { + String out = CronHumanizer.humanize("0 3 * * 1"); + assertTrue(out.contains("03:00") && out.toLowerCase().contains("monday"), + "expected '03:00' and 'Monday' in: " + out); + } + + @Test + void unparseableExpressionsFallThroughUnchanged() { + // Quartz 6-field — our CronDefinition is 5-field, so the parser rejects this. + assertEquals("0 0 0 * * ?", CronHumanizer.humanize("0 0 0 * * ?")); + // Garbage input — also rejected. + assertEquals("not a cron", CronHumanizer.humanize("not a cron")); + } + + @Test + void nicknamesParse() { + // CronDefinition opts in to @hourly / @daily / @weekly etc., so they should describe rather + // than fall through. We don't pin exact phrasing. + String hourly = CronHumanizer.humanize("@hourly"); + assertTrue(hourly.toLowerCase().contains("hour"), "expected 'hour' in: " + hourly); + } + + @Test + void nullAndEmptyPassThrough() { + assertNull(CronHumanizer.humanize(null)); + assertEquals("", CronHumanizer.humanize("")); + } +} diff --git a/hoptimator-graph/src/test/java/com/linkedin/hoptimator/graph/mermaid/MermaidRendererTest.java b/hoptimator-graph/src/test/java/com/linkedin/hoptimator/graph/mermaid/MermaidRendererTest.java new file mode 100644 index 00000000..be46949b --- /dev/null +++ b/hoptimator-graph/src/test/java/com/linkedin/hoptimator/graph/mermaid/MermaidRendererTest.java @@ -0,0 +1,241 @@ +package com.linkedin.hoptimator.graph.mermaid; + +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; + +import org.junit.jupiter.api.Test; + +import com.linkedin.hoptimator.graph.GraphEdge; +import com.linkedin.hoptimator.graph.GraphNode; +import com.linkedin.hoptimator.graph.PipelineGraph; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +/** + * Unit tests for {@link MermaidRenderer}. These exercise the rendering rules in isolation + * (hand-built {@link PipelineGraph} fixtures, no K8s involvement). + * + *

Assertions check structural invariants — orientation, subgraph membership, edge styles — + * rather than comparing the entire string verbatim. That keeps the tests resilient to whitespace + * tweaks while pinning down the contracts that drive readability. + */ +class MermaidRendererTest { + + @Test + void rendersMaterializedViewWithLrOrientation() { + GraphNode.View root = new GraphNode.View("audience", true); + GraphNode.External kafka = new GraphNode.External("kafka1", + Arrays.asList("KAFKA", "events")); + GraphNode.External venice = new GraphNode.External("venice-prod", + Arrays.asList("VENICE", "profile")); + GraphNode.Pipeline pipeline = new GraphNode.Pipeline("audience", null, null); + GraphNode.External sink = new GraphNode.External("ads", + Arrays.asList("VENICE", "audience")); + + Set nodes = setOf(root, kafka, venice, pipeline, sink); + Set edges = setOf( + new GraphEdge(root, pipeline, GraphEdge.Type.OWNER_OF), + new GraphEdge(kafka, pipeline, GraphEdge.Type.DEPENDS_ON_SOURCE), + new GraphEdge(venice, pipeline, GraphEdge.Type.DEPENDS_ON_SOURCE), + new GraphEdge(pipeline, sink, GraphEdge.Type.DEPENDS_ON_SINK)); + + String mermaid = new MermaidRenderer().render(new PipelineGraph(root, nodes, edges)); + + assertTrue(mermaid.startsWith("flowchart LR"), + "Materialized view should render top-down LR: " + mermaid); + // OWNER_OF must drive the subgraph wrapper, not an arrow. + assertTrue(!mermaid.contains("--OWNER_OF"), "OWNER_OF must not surface as an arrow"); + // The view's owned pipeline lives inside the View's subgraph wrapper. + assertTrue(mermaid.contains("subgraph "), "owners get subgraph wrappers"); + } + + @Test + void rendersLogicalTableTopDownWithTierSubgraphs() { + Map tiers = new LinkedHashMap<>(); + tiers.put("nearline", "kafka-db"); + tiers.put("online", "venice-db"); + tiers.put("offline", "hdfs-db"); + GraphNode.LogicalTable root = new GraphNode.LogicalTable("foo", tiers); + + GraphNode.External nearline = new GraphNode.External("kafka-db", + Arrays.asList("kafka-db", "foo")); + GraphNode.External online = new GraphNode.External("venice-db", + Arrays.asList("venice-db", "foo")); + GraphNode.External offline = new GraphNode.External("hdfs-db", + Arrays.asList("hdfs-db", "foo")); + GraphNode.Trigger trigger = new GraphNode.Trigger("foo-offline-trigger", + "0 */6 * * *", false, null, null); + + Set nodes = setOf(root, nearline, online, offline, trigger); + Set edges = setOf( + new GraphEdge(root, nearline, GraphEdge.Type.OWNER_OF), + new GraphEdge(root, online, GraphEdge.Type.OWNER_OF), + new GraphEdge(root, offline, GraphEdge.Type.OWNER_OF), + new GraphEdge(root, trigger, GraphEdge.Type.OWNER_OF), + new GraphEdge(trigger, offline, GraphEdge.Type.TRIGGERS)); + + String mermaid = new MermaidRenderer().render(new PipelineGraph(root, nodes, edges)); + + assertTrue(mermaid.startsWith("flowchart TD"), + "LogicalTable should render TD outer, LR inner: " + mermaid); + assertTrue(mermaid.contains("direction LR"), + "LogicalTable subgraph should set inner direction LR: " + mermaid); + // Each tier should produce its own nested subgraph header. + assertTrue(mermaid.contains("nearline"), "nearline tier subgraph missing: " + mermaid); + assertTrue(mermaid.contains("online"), "online tier subgraph missing: " + mermaid); + assertTrue(mermaid.contains("offline"), "offline tier subgraph missing: " + mermaid); + // Trigger arrow uses dotted edge. + assertTrue(mermaid.contains(" -.-> "), "trigger edge should be dotted: " + mermaid); + // Don't pin the exact phrasing — that's cron-utils' contract. Just verify the schedule + // makes it onto the trigger label in humanized form rather than the raw cron string. + assertTrue(mermaid.contains("cron: ") && mermaid.contains("6") && mermaid.contains("hour"), + "trigger label should expose the cron schedule humanized: " + mermaid); + } + + @Test + void rendersTriggerPausedSuffix() { + GraphNode.Trigger trigger = new GraphNode.Trigger("t1", "@hourly", true, null, null); + GraphNode.External target = new GraphNode.External("db", Arrays.asList("path")); + Set nodes = setOf(trigger, target); + Set edges = setOf(new GraphEdge(trigger, target, GraphEdge.Type.TRIGGERS)); + + String mermaid = new MermaidRenderer().render(new PipelineGraph(trigger, nodes, edges)); + + assertTrue(mermaid.contains("(paused)"), + "paused trigger label should be marked: " + mermaid); + } + + @Test + void rendersJobKindAndEngineInsidePipelineLabel() { + // Pipeline carries optional jobKind/engine pulled from the underlying job artifact — those + // surface inline in the Pipeline node label so the user sees the execution shape at a glance. + GraphNode.Pipeline pipe = new GraphNode.Pipeline("p1", "FlinkDeployment", "Flink"); + Set nodes = setOf(pipe); + Set edges = setOf(); + + String mermaid = new MermaidRenderer().render(new PipelineGraph(pipe, nodes, edges)); + + assertTrue(mermaid.contains("kind: FlinkDeployment"), + "pipeline label missing jobKind: " + mermaid); + assertTrue(mermaid.contains("engine: Flink"), + "pipeline label missing engine: " + mermaid); + } + + @Test + void emptyGraphRendersOrientationOnly() { + GraphNode.View root = new GraphNode.View("v1", false); + Set nodes = setOf(root); + Set edges = setOf(); + + String mermaid = new MermaidRenderer().render(new PipelineGraph(root, nodes, edges)); + + // Leaf View renders with just the name — rectangle shape conveys "view-ness". + assertEquals("flowchart LR\n n0[\"v1\"]\n", mermaid); + } + + @Test + void triggerWithoutScheduleOmitsCron() { + // Triggers may be event-driven (no cron). Label should not show a "cron:" prefix in that case. + GraphNode.Trigger trigger = new GraphNode.Trigger("t1", null, false, null, null); + GraphNode.External target = new GraphNode.External("db", Arrays.asList("path")); + Set nodes = setOf(trigger, target); + Set edges = setOf(new GraphEdge(trigger, target, GraphEdge.Type.TRIGGERS)); + + String mermaid = new MermaidRenderer().render(new PipelineGraph(trigger, nodes, edges)); + + assertTrue(!mermaid.contains("cron:"), "trigger without schedule should omit cron line: " + mermaid); + } + + @Test + void ownerSubgraphDoesNotEmitOwnerAsItsOwnChildNode() { + // Mermaid rejects "Setting n0 as parent of n0 would create a cycle" when a subgraph id + // collides with a node id inside it. Pin down the invariant: when an owner gets a subgraph + // wrapper, its mermaid id must appear only as the subgraph header — never as a node line. + GraphNode.View view = new GraphNode.View("audience", true); + GraphNode.Pipeline pipeline = new GraphNode.Pipeline("audience-pipe", null, null); + GraphNode.External sink = new GraphNode.External("ads", Arrays.asList("realized")); + + Set nodes = setOf(view, pipeline, sink); + Set edges = setOf( + new GraphEdge(view, pipeline, GraphEdge.Type.OWNER_OF), + new GraphEdge(pipeline, sink, GraphEdge.Type.DEPENDS_ON_SINK)); + + String mermaid = new MermaidRenderer().render(new PipelineGraph(view, nodes, edges)); + + // n0 should appear once on the subgraph header line and never as a `n0[...]` node line. + int subgraphHeaders = countOccurrences(mermaid, "subgraph n0["); + int nodeLines = countOccurrences(mermaid, " n0[\""); + assertEquals(1, subgraphHeaders, "owner should be the subgraph header exactly once: " + mermaid); + assertEquals(0, nodeLines, "owner must not also appear as a node line inside its own subgraph: " + mermaid); + } + + @Test + void ownerSubgraphWrapsOwnedChildrenForNonLogicalTable() { + // When a non-LogicalTable node owns children (e.g. a View owning its realizing pipeline), + // the owner gets a subgraph wrapper with the children inside. + GraphNode.View view = new GraphNode.View("audience", true); + GraphNode.Pipeline pipeline = new GraphNode.Pipeline("audience-pipe", null, null); + GraphNode.External sink = new GraphNode.External("ads", Arrays.asList("realized")); + + Set nodes = setOf(view, pipeline, sink); + Set edges = setOf( + new GraphEdge(view, pipeline, GraphEdge.Type.OWNER_OF), + new GraphEdge(pipeline, sink, GraphEdge.Type.DEPENDS_ON_SINK)); + + String mermaid = new MermaidRenderer().render(new PipelineGraph(view, nodes, edges)); + + assertTrue(mermaid.contains("subgraph "), + "View owning a pipeline should produce a subgraph wrapper: " + mermaid); + // Subgraph wrapper carries only the kind ("Materialized View") — the inner pipeline node + // already shows the resource name, so repeating it on the wrapper is just noise. + assertTrue(mermaid.contains("[\"Materialized View\"]"), + "View subgraph label should be just the kind without the name: " + mermaid); + assertTrue(mermaid.contains("audience"), + "the name should still appear (on the inner pipeline node): " + mermaid); + } + + @Test + void multipleSourcesAllConnectToTheSamePipeline() { + GraphNode.View root = new GraphNode.View("fanin", true); + GraphNode.Pipeline pipe = new GraphNode.Pipeline("fanin-pipe", null, null); + GraphNode.External a = new GraphNode.External("db1", Arrays.asList("a")); + GraphNode.External b = new GraphNode.External("db2", Arrays.asList("b")); + GraphNode.External c = new GraphNode.External("db3", Arrays.asList("c")); + + Set nodes = setOf(root, pipe, a, b, c); + Set edges = setOf( + new GraphEdge(root, pipe, GraphEdge.Type.OWNER_OF), + new GraphEdge(a, pipe, GraphEdge.Type.DEPENDS_ON_SOURCE), + new GraphEdge(b, pipe, GraphEdge.Type.DEPENDS_ON_SOURCE), + new GraphEdge(c, pipe, GraphEdge.Type.DEPENDS_ON_SOURCE)); + + String mermaid = new MermaidRenderer().render(new PipelineGraph(root, nodes, edges)); + + int arrowCount = countOccurrences(mermaid, " --> "); + assertEquals(3, arrowCount, "three source arrows expected: " + mermaid); + } + + private static int countOccurrences(String haystack, String needle) { + int count = 0; + int idx = 0; + while ((idx = haystack.indexOf(needle, idx)) != -1) { + count++; + idx += needle.length(); + } + return count; + } + + @SafeVarargs + private static Set setOf(T... items) { + Set set = new LinkedHashSet<>(); + for (T item : items) { + set.add(item); + } + return set; + } +} diff --git a/settings.gradle b/settings.gradle index 2da00c0e..cfc11bda 100644 --- a/settings.gradle +++ b/settings.gradle @@ -7,6 +7,7 @@ include 'hoptimator-catalog' // <-- marked for deletion include 'hoptimator-cli' include 'hoptimator-demodb' include 'hoptimator-flink-runner' +include 'hoptimator-graph' include 'hoptimator-jdbc' include 'hoptimator-jdbc-driver' include 'hoptimator-jdbc-driver-int' From 90e94c28717ea1006eaacdd39fde249de51158ce Mon Sep 17 00:00:00 2001 From: Joseph Grogan Date: Tue, 19 May 2026 17:20:14 -0400 Subject: [PATCH 2/7] Add K8s graph provider with SQL identifier resolution Hooks the graph SPI from the previous commit up to a working data path: - GraphService (hoptimator-jdbc): the dispatch entry point. Walks Calcite's schema tree to resolve a user-typed identifier (TWO_LEVEL.NAME or THREE_LEVEL.SCHEMA.NAME) to a GraphTarget, loads GraphProvider and GraphRenderer impls via ServiceLoader, and dispatches. - PipelineGraphBuilder + K8sGraphProvider (hoptimator-k8s): direction-aware traversal of Pipeline / TableTrigger / View references that produces a PipelineGraph rooted at the target. - LogicalTable detection: HoptimatorJdbcSchema.isLogical() lazily walks its downstream Calcite connection to spot any sub-schema tagged with the new LogicalSchemaMarker, so the JDBC layer can surface LogicalTable targets without baking driver-specific URL prefixes or class names into the adapter. LogicalTableSchema implements the marker. isLogical() lives on HoptimatorJdbcSchema rather than the Database SPI to keep Database a thin K8s-CRD surface free of planner concerns. - GraphService resolves identifiers to MaterializedView (view), LogicalTable (marker hit), or Resource (everything else with a HoptimatorJdbcSchema backing) targets; failures throw SQLException with the offending segment in the message rather than silently building a degenerate graph. --- hoptimator-jdbc/build.gradle | 1 + .../hoptimator/jdbc/GraphService.java | 158 +++++ .../hoptimator/jdbc/GraphServiceTest.java | 285 ++++++++ .../hoptimator/k8s/K8sGraphProvider.java | 60 ++ .../hoptimator/k8s/PipelineGraphBuilder.java | 633 +++++++++++++++++ ...om.linkedin.hoptimator.graph.GraphProvider | 1 + .../hoptimator/k8s/K8sGraphProviderTest.java | 85 +++ .../k8s/PipelineGraphBuilderTest.java | 641 ++++++++++++++++++ .../logical/LogicalTableSchema.java | 9 +- .../util/planner/HoptimatorJdbcSchema.java | 42 ++ .../util/planner/LogicalSchemaMarker.java | 23 + .../planner/HoptimatorJdbcSchemaTest.java | 153 +++++ 12 files changed, 2084 insertions(+), 7 deletions(-) create mode 100644 hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/GraphService.java create mode 100644 hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/GraphServiceTest.java create mode 100644 hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sGraphProvider.java create mode 100644 hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/PipelineGraphBuilder.java create mode 100644 hoptimator-k8s/src/main/resources/META-INF/services/com.linkedin.hoptimator.graph.GraphProvider create mode 100644 hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sGraphProviderTest.java create mode 100644 hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/PipelineGraphBuilderTest.java create mode 100644 hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/LogicalSchemaMarker.java diff --git a/hoptimator-jdbc/build.gradle b/hoptimator-jdbc/build.gradle index 9d06d0e9..62ec7ea0 100644 --- a/hoptimator-jdbc/build.gradle +++ b/hoptimator-jdbc/build.gradle @@ -18,6 +18,7 @@ dependencies { testFixturesImplementation libs.quidem testFixturesImplementation libs.calcite.core testFixturesImplementation project(':hoptimator-api') + testFixturesImplementation project(':hoptimator-graph') testFixturesImplementation project(':hoptimator-util') testFixturesImplementation libs.junit.jupiter.api diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/GraphService.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/GraphService.java new file mode 100644 index 00000000..b0f8aa07 --- /dev/null +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/GraphService.java @@ -0,0 +1,158 @@ +package com.linkedin.hoptimator.jdbc; + +import java.sql.SQLException; +import java.util.Arrays; +import java.util.ArrayList; +import java.util.List; +import java.util.ServiceLoader; +import java.util.stream.Collectors; + +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; + +import com.linkedin.hoptimator.Database; +import com.linkedin.hoptimator.graph.GraphProvider; +import com.linkedin.hoptimator.graph.GraphRenderer; +import com.linkedin.hoptimator.graph.GraphTarget; +import com.linkedin.hoptimator.graph.PipelineGraph; +import com.linkedin.hoptimator.util.planner.HoptimatorJdbcSchema; +import org.apache.calcite.util.Util; + + +/** + * SPI dispatcher for {@link GraphProvider} (graph construction) and {@link GraphRenderer} + * (graph serialization). Mirrors the {@code DeploymentService} pattern: providers and + * renderers register via {@code META-INF/services}, callers go through static methods on + * this class to discover and dispatch. + */ +public final class GraphService { + + private GraphService() { + } + + /** + * Build a {@link PipelineGraph} for the given target by dispatching to the first registered + * {@link GraphProvider} that {@code supports(target)}. + * + * @throws SQLException if no provider supports the target, or if the underlying provider + * throws. + */ + public static PipelineGraph buildGraph(String identifier, int depth, HoptimatorConnection connection) + throws SQLException { + if (depth < 0) { + throw new SQLException("depth must be non-negative; got: " + depth); + } + GraphTarget target = resolve(identifier, connection); + for (GraphProvider provider : providers()) { + if (provider.supports(target)) { + return provider.forTarget(target, depth, connection); + } + } + throw new SQLException("No GraphProvider supports target: " + target + + ". Registered providers: " + providerSummary()); + } + + /** + * Walks the user-typed SQL identifier through Calcite's schema tree and produces the + * {@link GraphTarget} subtype that matches what was found: + * + *

+ * + *

Throws {@link SQLException} for any path that can't be resolved: a schema segment + * that doesn't exist, a leaf schema that isn't backed by a Hoptimator {@link Database}, or + * a leaf name that no Table in the catalog matches. All three are real "you typed something + * this connection can't see" conditions, distinct from "table exists but has no pipelines." + * + *

LogicalTable detection is schema-level because the outer connection sees LogicalTables + * wrapped as generic JDBC adapter tables — the class identity is erased on the JDBC hop. + * {@code HoptimatorJdbcSchema} lazily walks its downstream connection to find the marker. + */ + static GraphTarget resolve(String identifier, HoptimatorConnection connection) throws SQLException { + List fullPath = Arrays.asList(identifier.split("\\.")); + SchemaPlus schema = connection.calciteConnection().getRootSchema(); + List schemaPath = Util.skipLast(fullPath); + for (String segment : schemaPath) { + SchemaPlus next = schema == null ? null : schema.subSchemas().get(segment); + if (next == null) { + throw new SQLException("Identifier " + identifier + + " does not exist: schema segment '" + segment + "' not found in this connection."); + } + schema = next; + } + + String leafName = Util.last(fullPath); + Table table = schema == null ? null : schema.tables().get(leafName); + if (table == null) { + throw new SQLException("Identifier " + identifier + + " does not exist: table '" + leafName + "' not found in schema " + + String.join(".", schemaPath) + "."); + } + if (table instanceof MaterializedViewTable) { + return new GraphTarget.View(identifier); + } + + HoptimatorJdbcSchema hjs = schema.unwrap(HoptimatorJdbcSchema.class); + if (hjs == null) { + throw new SQLException("Identifier " + identifier + + " does not resolve to a database in this connection."); + } + if (hjs.isLogical()) { + return new GraphTarget.LogicalTable(identifier); + } + return new GraphTarget.Resource(hjs.databaseName(), fullPath); + } + + /** + * Render a graph using the registered {@link GraphRenderer} whose {@link GraphRenderer#format()} + * matches {@code format} (case-insensitive). + * + * @throws IllegalArgumentException if no renderer supports {@code format}. + */ + public static String render(PipelineGraph graph, String format) { + for (GraphRenderer renderer : renderers()) { + if (renderer.format().equalsIgnoreCase(format)) { + return renderer.render(graph); + } + } + throw new IllegalArgumentException("No GraphRenderer registered for format: " + format + + ". Available: " + availableFormats()); + } + + /** Format identifiers (e.g. {@code mermaid}) registered by visible {@link GraphRenderer}s. */ + public static List availableFormats() { + return renderers().stream() + .map(GraphRenderer::format) + .distinct() + .collect(Collectors.toList()); + } + + // ─── SPI loading ───────────────────────────────────────────────────────── + + private static List providers() { + List providers = new ArrayList<>(); + ServiceLoader.load(GraphProvider.class).iterator().forEachRemaining(providers::add); + return providers; + } + + private static List renderers() { + List renderers = new ArrayList<>(); + ServiceLoader.load(GraphRenderer.class).iterator().forEachRemaining(renderers::add); + return renderers; + } + + private static String providerSummary() { + List names = new ArrayList<>(); + for (GraphProvider p : providers()) { + names.add(p.getClass().getSimpleName()); + } + return names.isEmpty() ? "" : String.join(", ", names); + } +} diff --git a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/GraphServiceTest.java b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/GraphServiceTest.java new file mode 100644 index 00000000..010deee3 --- /dev/null +++ b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/GraphServiceTest.java @@ -0,0 +1,285 @@ +package com.linkedin.hoptimator.jdbc; + +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +import org.apache.calcite.jdbc.CalciteConnection; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.schema.impl.ViewTable; +import org.apache.calcite.schema.lookup.Lookup; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.linkedin.hoptimator.graph.GraphNode; +import com.linkedin.hoptimator.graph.GraphTarget; +import com.linkedin.hoptimator.graph.PipelineGraph; +import com.linkedin.hoptimator.util.planner.HoptimatorJdbcSchema; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mock; + + +/** + * Unit tests for {@link GraphService}. Covers SPI discovery (renderer registration via + * {@code META-INF/services}) and the resolver that walks Calcite's schema tree to figure out + * what kind of entity a SQL identifier refers to. + */ +@ExtendWith(MockitoExtension.class) +class GraphServiceTest { + + @Mock + private HoptimatorConnection connection; + @Mock + private CalciteConnection calciteConnection; + + // ─── SPI discovery ─────────────────────────────────────────────────────── + + @Test + void availableFormatsIncludesMermaid() { + List formats = GraphService.availableFormats(); + assertTrue(formats.contains("mermaid"), + "MermaidRenderer should be discoverable via ServiceLoader: " + formats); + } + + @Test + void renderUnknownFormatThrowsWithHelpfulMessage() { + GraphNode root = new GraphNode.External("db", List.of("t")); + PipelineGraph graph = new PipelineGraph(root, Set.of(root), Set.of()); + + IllegalArgumentException ex = assertThrows(IllegalArgumentException.class, + () -> GraphService.render(graph, "definitely-not-a-format")); + assertTrue(ex.getMessage().contains("definitely-not-a-format"), + "error should mention the unknown format: " + ex.getMessage()); + assertTrue(ex.getMessage().contains("Available"), + "error should list available formats: " + ex.getMessage()); + } + + // ─── Identifier resolution via Calcite ──────────────────────────────────── + + @Test + void resolveTwoLevelResourceFindsDatabaseCrdName() throws SQLException { + // User typed ADS.AD_CLICKS. Schema ADS is a Database in the connection with CRD name ads-database, + // and AD_CLICKS is a real table in its catalog (a plain physical table, not a view / LogicalTable). + SchemaPlus ads = schemaWithDatabaseAndTable("ads-database", "AD_CLICKS", plainTable()); + stubRootSchema(schemaWithSubs("ADS", ads)); + + GraphTarget.Resource out = (GraphTarget.Resource) GraphService.resolve("ADS.AD_CLICKS", connection); + + assertEquals("ads-database", out.database()); + assertEquals(Arrays.asList("ADS", "AD_CLICKS"), out.path()); + } + + @Test + void resolveThreeLevelResourceWalksCatalogAndSchema() throws SQLException { + // User typed MYSQL.testdb.orders. MYSQL is the catalog sub-schema; testdb is the Database; + // orders is a real physical table in the catalog. + SchemaPlus testdb = schemaWithDatabaseAndTable("mysql", "orders", plainTable()); + SchemaPlus mysqlCatalog = schemaWithSubs("testdb", testdb); + stubRootSchema(schemaWithSubs("MYSQL", mysqlCatalog)); + + GraphTarget.Resource out = (GraphTarget.Resource) GraphService.resolve( + "MYSQL.testdb.orders", connection); + + assertEquals("mysql", out.database()); + assertEquals(Arrays.asList("MYSQL", "testdb", "orders"), out.path()); + } + + @Test + void resolveUnknownTableInKnownSchemaThrowsSqlException() { + // Schema ADS resolves to a Database, but no AD_CLICKS table exists in the catalog. + // Distinct from "schema doesn't exist": this is "you typo'd the table name", which is + // different from "table exists but no pipeline references it" (the no-pipelines path, + // which doesn't reach this codepath — it's render-time). + SchemaPlus ads = schemaWithDatabaseAndTable("ads-database", /*tableName=*/null, /*table=*/null); + stubRootSchema(schemaWithSubs("ADS", ads)); + + SQLException ex = assertThrows(SQLException.class, + () -> GraphService.resolve("ADS.AD_CLICKS", connection)); + assertTrue(ex.getMessage().contains("ADS.AD_CLICKS"), + "error should name the offending identifier: " + ex.getMessage()); + assertTrue(ex.getMessage().contains("AD_CLICKS"), + "error should pinpoint the missing table name: " + ex.getMessage()); + } + + @Test + void resolveMaterializedViewProducesViewTarget() throws SQLException { + // The leaf table is a MaterializedViewTable — kind detection returns GraphTarget.View. + MaterializedViewTable mv = new MaterializedViewTable(mock(ViewTable.class)); + SchemaPlus profile = schemaWithDatabaseAndTable("profile-database", "AUDIENCE", mv); + stubRootSchema(schemaWithSubs("PROFILE", profile)); + + GraphTarget out = GraphService.resolve("PROFILE.AUDIENCE", connection); + + GraphTarget.View view = assertInstanceOf(GraphTarget.View.class, out); + assertEquals("PROFILE.AUDIENCE", view.name(), + "View target carries the SQL-side identifier; the builder canonicalizes downstream"); + } + + @Test + void resolveLogicalTableProducesLogicalTableTarget() throws SQLException { + // The schema unwraps to a HoptimatorJdbcSchema whose isLogical() is true — detection runs at + // the schema level because the outer JDBC adapter wraps tables and erases the LogicalTable + // class identity. + SchemaPlus logical = schemaWithDatabaseAndTable("logical-database", "MEMBERS", plainTable(), + /*isLogical=*/true); + stubRootSchema(schemaWithSubs("LOGICAL", logical)); + + GraphTarget out = GraphService.resolve("LOGICAL.MEMBERS", connection); + + GraphTarget.LogicalTable ltTarget = assertInstanceOf(GraphTarget.LogicalTable.class, out); + assertEquals("LOGICAL.MEMBERS", ltTarget.name()); + } + + @Test + void resolveUnknownSchemaSegmentThrowsSqlException() { + // User typed UNKNOWN.FOO. Root has no UNKNOWN sub-schema — bail with a clear error. + stubRootSchema(schemaWithSubs("UNKNOWN", null)); + + SQLException ex = assertThrows(SQLException.class, + () -> GraphService.resolve("UNKNOWN.FOO", connection)); + assertTrue(ex.getMessage().contains("UNKNOWN.FOO"), + "error should name the offending identifier: " + ex.getMessage()); + assertTrue(ex.getMessage().contains("UNKNOWN"), + "error should pinpoint which segment failed: " + ex.getMessage()); + } + + @Test + void resolveSchemaNotBackedByHoptimatorJdbcSchemaThrowsSqlException() { + // The schema resolves AND a leaf table exists, but the schema isn't a Hoptimator JDBC + // schema — e.g. an information_schema with system tables. We can't produce a K8s-side + // identifier, so we error rather than silently building an unresolvable Resource. + SchemaPlus nonHoptimator = schemaWithSubs(null, null); + Lookup tables = mock(Lookup.class); + lenient().doReturn(plainTable()).when(tables).get("TABLE"); + lenient().doReturn(tables).when(nonHoptimator).tables(); + lenient().when(nonHoptimator.unwrap(HoptimatorJdbcSchema.class)).thenReturn(null); + stubRootSchema(schemaWithSubs("INFO", nonHoptimator)); + + SQLException ex = assertThrows(SQLException.class, + () -> GraphService.resolve("INFO.TABLE", connection)); + assertTrue(ex.getMessage().contains("INFO.TABLE"), + "error should name the identifier: " + ex.getMessage()); + assertTrue(ex.getMessage().contains("database"), + "error should explain why: " + ex.getMessage()); + } + + @Test + void renderMatchingFormatDispatchesToRenderer() { + // The happy-path complement of renderUnknownFormatThrows — verifies the renderer is + // actually invoked when format matches (case-insensitive). + GraphNode root = new GraphNode.External("db", List.of("t")); + PipelineGraph graph = new PipelineGraph(root, Set.of(root), Set.of()); + + String out = GraphService.render(graph, "MERMAID"); + assertTrue(out.startsWith("flowchart"), + "Mermaid renderer should be picked case-insensitively and produce a flowchart: " + out); + } + + @Test + void buildGraphSurfacesNoProviderError() throws SQLException { + // No GraphProvider is registered on this module's test classpath, so any resolvable + // identifier reaches the "no provider supports target" branch. Wires through the resolver + // first (identifier must walk Calcite cleanly), then fails dispatch. + SchemaPlus ads = schemaWithDatabaseAndTable("ads-database", "AD_CLICKS", plainTable()); + stubRootSchema(schemaWithSubs("ADS", ads)); + + SQLException ex = assertThrows(SQLException.class, + () -> GraphService.buildGraph("ADS.AD_CLICKS", 1, connection)); + assertTrue(ex.getMessage().contains("No GraphProvider supports"), + "error should explain the dispatch failure: " + ex.getMessage()); + } + + @Test + void buildGraphRejectsNegativeDepth() { + // Negative depths are nonsense — error rather than silently clamping. depth 0 is valid + // (renders just the root); only strictly-negative depths fail. + SQLException ex = assertThrows(SQLException.class, + () -> GraphService.buildGraph("PROFILE.MEMBERS", -1, connection)); + assertTrue(ex.getMessage().contains("depth"), + "error should mention the bad parameter: " + ex.getMessage()); + assertTrue(ex.getMessage().contains("-1"), + "error should echo the bad value: " + ex.getMessage()); + } + + @Test + void resolveSinglePartIdentifierThrowsSqlException() { + // Bare names like "audience" walk no schema segments and hit the root for the table lookup. + // Root has no tables in this fixture → "table not found" with the root's empty schema path. + stubRootSchema(schemaWithSubs(null, null)); + + SQLException ex = assertThrows(SQLException.class, + () -> GraphService.resolve("audience", connection)); + assertTrue(ex.getMessage().contains("audience"), + "error should name the offending identifier: " + ex.getMessage()); + } + + // ─── Mock plumbing ─────────────────────────────────────────────────────── + + /** Plain physical table — not a view, not a LogicalTable. Resolver should treat as Resource. */ + private static Table plainTable() { + return new AbstractTable() { + @Override + public RelDataType getRowType(RelDataTypeFactory factory) { + return factory.createStructType(List.of(), List.of()); + } + }; + } + + /** Convenience overload: physical (non-logical) database. */ + private static SchemaPlus schemaWithDatabaseAndTable(String crdName, String tableName, Table table) { + return schemaWithDatabaseAndTable(crdName, tableName, table, /*isLogical=*/false); + } + + /** + * Mock a SchemaPlus that's (a) unwrappable as a {@link HoptimatorJdbcSchema} with the given + * CRD name and {@code isLogical()} flag, and (b) when {@code tableName != null}, exposes + * {@code table} via {@code tables().get(tableName)}. + */ + private static SchemaPlus schemaWithDatabaseAndTable(String crdName, String tableName, Table table, + boolean isLogical) { + SchemaPlus schema = schemaWithSubs(null, null); + HoptimatorJdbcSchema hjs = mock(HoptimatorJdbcSchema.class); + lenient().when(hjs.databaseName()).thenReturn(crdName); + lenient().when(hjs.isLogical()).thenReturn(isLogical); + lenient().when(schema.unwrap(HoptimatorJdbcSchema.class)).thenReturn(hjs); + if (tableName != null) { + Lookup tables = mock(Lookup.class); + lenient().doReturn(table).when(tables).get(tableName); + lenient().doReturn(tables).when(schema).tables(); + } + return schema; + } + + private static SchemaPlus schemaWithSubs(String childName, SchemaPlus child) { + Lookup subs = mock(Lookup.class); + if (childName != null) { + lenient().doReturn(child).when(subs).get(childName); + } + // Default tables() to an empty Lookup so resolve.tables().get(...) returns null cleanly + // for schemas that don't host a relevant Table — individual tests can override. + Lookup emptyTables = mock(Lookup.class); + SchemaPlus s = mock(SchemaPlus.class); + lenient().doReturn(subs).when(s).subSchemas(); + lenient().doReturn(emptyTables).when(s).tables(); + return s; + } + + /** Wire {@code connection.calciteConnection().getRootSchema()} to return {@code root}. */ + private void stubRootSchema(SchemaPlus root) { + lenient().when(connection.calciteConnection()).thenReturn(calciteConnection); + lenient().when(calciteConnection.getRootSchema()).thenReturn(root); + } + +} diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sGraphProvider.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sGraphProvider.java new file mode 100644 index 00000000..c1ec6795 --- /dev/null +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sGraphProvider.java @@ -0,0 +1,60 @@ +package com.linkedin.hoptimator.k8s; + +import java.sql.Connection; +import java.sql.SQLException; + +import com.linkedin.hoptimator.graph.GraphProvider; +import com.linkedin.hoptimator.graph.GraphTarget; +import com.linkedin.hoptimator.graph.PipelineGraph; + + +/** + * {@link GraphProvider} backed by Hoptimator's K8s state. Wraps {@link PipelineGraphBuilder} — + * the existing entry-point methods become routes for each {@link GraphTarget} subtype: + * + *

+ * + *

Registered via {@code META-INF/services/com.linkedin.hoptimator.graph.GraphProvider} so callers + * reach it through {@link com.linkedin.hoptimator.jdbc.GraphService}. + */ +public class K8sGraphProvider implements GraphProvider { + + @Override + public PipelineGraph forTarget(GraphTarget target, int depth, Connection connection) + throws SQLException { + K8sContext context = K8sContext.create(connection); + PipelineGraphBuilder builder = createBuilder(context); + if (target instanceof GraphTarget.View) { + GraphTarget.View v = (GraphTarget.View) target; + return builder.forView(v.name(), depth); + } + if (target instanceof GraphTarget.LogicalTable) { + GraphTarget.LogicalTable lt = (GraphTarget.LogicalTable) target; + return builder.forLogicalTable(lt.name(), depth); + } + if (target instanceof GraphTarget.Resource) { + GraphTarget.Resource r = (GraphTarget.Resource) target; + return builder.forResource(r.database(), r.path(), depth); + } + throw new SQLException("K8sGraphProvider does not support target: " + target); + } + + @Override + public boolean supports(GraphTarget target) { + return target instanceof GraphTarget.View + || target instanceof GraphTarget.LogicalTable + || target instanceof GraphTarget.Resource; + } + + /** Factory method so tests can substitute a mock builder. */ + PipelineGraphBuilder createBuilder(K8sContext context) { + return new PipelineGraphBuilder(context); + } +} diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/PipelineGraphBuilder.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/PipelineGraphBuilder.java new file mode 100644 index 00000000..dffbc1a3 --- /dev/null +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/PipelineGraphBuilder.java @@ -0,0 +1,633 @@ +package com.linkedin.hoptimator.k8s; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.openapi.models.V1OwnerReference; + +import com.linkedin.hoptimator.graph.GraphEdge; +import com.linkedin.hoptimator.graph.GraphNode; +import com.linkedin.hoptimator.graph.PipelineGraph; +import com.linkedin.hoptimator.k8s.models.V1alpha1LogicalTable; +import com.linkedin.hoptimator.k8s.models.V1alpha1LogicalTableList; +import com.linkedin.hoptimator.k8s.models.V1alpha1LogicalTableSpec; +import com.linkedin.hoptimator.k8s.models.V1alpha1LogicalTableSpecTiers; +import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline; +import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList; +import com.linkedin.hoptimator.k8s.models.V1alpha1TableTrigger; +import com.linkedin.hoptimator.k8s.models.V1alpha1TableTriggerList; +import com.linkedin.hoptimator.k8s.models.V1alpha1View; +import com.linkedin.hoptimator.k8s.models.V1alpha1ViewList; + + +/** + * Builds a {@link PipelineGraph} for visualization by walking K8s CRDs and the + * {@code depends-on-*} dependency index stamped on Pipeline CRDs. + * + *

Three entry points: + *

+ * + *

Depth bounds traversal in both directions through the {@code depends-on-} label + * selector. Callers pick whatever depth they want; the builder only floors negatives at 0 + * so the recursion always terminates. + */ +public final class PipelineGraphBuilder { + + private final K8sApi viewApi; + private final K8sApi pipelineApi; + private final K8sApi logicalTableApi; + private final K8sApi triggerApi; + + public PipelineGraphBuilder(K8sContext context) { + this(new K8sApi<>(context, K8sApiEndpoints.VIEWS), + new K8sApi<>(context, K8sApiEndpoints.PIPELINES), + new K8sApi<>(context, K8sApiEndpoints.LOGICAL_TABLES), + new K8sApi<>(context, K8sApiEndpoints.TABLE_TRIGGERS)); + } + + /** Package-private constructor for tests; accepts pre-built (or fake) K8s APIs. */ + PipelineGraphBuilder(K8sApi viewApi, + K8sApi pipelineApi, + K8sApi logicalTableApi, + K8sApi triggerApi) { + this.viewApi = viewApi; + this.pipelineApi = pipelineApi; + this.logicalTableApi = logicalTableApi; + this.triggerApi = triggerApi; + } + + // ─── Public entry points ───────────────────────────────────────────────── + + public PipelineGraph forView(String name, int depth) throws SQLException { + // Accept SQL-side identifiers (e.g. {@code VENICE.test-store$insert-partial}) — the CRD is + // stored under a canonicalized name (lowercase, {@code _} stripped, {@code $} → {@code -}, + // dot-separated parts joined with {@code -}). Canonicalization is idempotent, so passing the + // already-canonical CRD name still works. + String crdName = K8sUtils.canonicalizeName(Arrays.asList(name.split("\\."))); + V1alpha1View view = viewApi.get(crdName); + boolean materialized = Boolean.TRUE.equals(view.getSpec().getMaterialized()); + GraphNode.View root = new GraphNode.View(crdName, materialized); + + int cappedDepth = cap(depth); + Traversal t = new Traversal(cappedDepth); + t.addNode(root); + + // Materialized views have a Pipeline owned by the View CRD whose sink matches the view's + // canonicalized path. Discover it via owner-reference scan over Pipelines in the namespace, + // and expand each — but do NOT walk further upstream. When the resolver tags an identifier + // as a View, the user expects "what this view does" (its pipeline + the resources it reads + // and writes directly), not a transitive chain to whatever produces its inputs. For the + // chain view, run !graph on the source identifier instead — Resource targets honor depth. + if (materialized) { + String viewUid = view.getMetadata() == null ? null : view.getMetadata().getUid(); + for (V1alpha1Pipeline pipeline : pipelineApi.list()) { + if (ownedBy(pipeline.getMetadata(), "View", crdName, viewUid)) { + GraphNode.Pipeline pipeNode = t.expandPipelineDirected(pipeline, Direction.UPSTREAM, 0); + if (pipeNode != null) { + t.addEdge(new GraphEdge(root, pipeNode, GraphEdge.Type.OWNER_OF)); + } + } + } + } + + return t.build(root); + } + + public PipelineGraph forLogicalTable(String name, int depth) throws SQLException { + // Same canonicalization as forView — accept SQL-side identifiers and resolve to the + // canonicalized CRD name. + String crdName = K8sUtils.canonicalizeName(Arrays.asList(name.split("\\."))); + V1alpha1LogicalTable lt = logicalTableApi.get(crdName); + Map tierMap = tierMap(lt.getSpec()); + GraphNode.LogicalTable root = new GraphNode.LogicalTable(crdName, tierMap); + + Traversal t = new Traversal(cap(depth)); + t.addNode(root); + + String ltUid = lt.getMetadata() == null ? null : lt.getMetadata().getUid(); + String ltName = crdName; + + // Implicit inter-tier pipelines: discovered by scanning all pipelines whose ownerReferences + // point to this LogicalTable. Each gets an OWNER_OF edge from the LogicalTable so the + // renderer nests them inside the LogicalTable subgraph. Pipeline expansion adds the actual + // tier-physical externals — we group those into tier subgraphs in a second pass below. + // Same scoping rule as forView: don't recurse beyond the owned pipelines' immediate + // neighbors. The chain view belongs to !graph on the source identifier (a Resource target). + for (V1alpha1Pipeline pipeline : pipelineApi.list()) { + if (ownedBy(pipeline.getMetadata(), "LogicalTable", ltName, ltUid)) { + GraphNode.Pipeline pipeNode = t.expandPipelineDirected(pipeline, Direction.UPSTREAM, 0); + if (pipeNode != null) { + t.addEdge(new GraphEdge(root, pipeNode, GraphEdge.Type.OWNER_OF)); + } + } + } + + // OWNER_OF any external whose database matches a tier database — that's a tier-physical + // resource of *this* LogicalTable, so the renderer nests it inside the appropriate tier + // subgraph. Externals that aren't tier-physical (e.g. downstream consumers reached during + // recursive expansion) stay outside. + for (GraphNode node : new ArrayList<>(t.nodes())) { + if (node instanceof GraphNode.External + && tierMap.containsValue(((GraphNode.External) node).database())) { + t.addEdge(new GraphEdge(root, node, GraphEdge.Type.OWNER_OF)); + } + } + + // Owned triggers — same scan against the trigger API. Bridging triggers (those with both a + // source and a sink annotation) render as `source -.-> trigger -.-> sink`. Single-node + // triggers (Shape A) render with whichever endpoint is set. + for (V1alpha1TableTrigger trigger : triggerApi.list()) { + if (ownedBy(trigger.getMetadata(), "LogicalTable", ltName, ltUid)) { + GraphNode.Trigger tNode = triggerNode(trigger); + t.addNode(tNode); + t.addEdge(new GraphEdge(root, tNode, GraphEdge.Type.OWNER_OF)); + attachTriggerFlowEdges(t, trigger, tNode); + } + } + + return t.build(root); + } + + public PipelineGraph forResource(String database, List path, int depth) throws SQLException { + GraphNode.External root = externalNode(database, path); + + int cappedDepth = cap(depth); + Traversal t = new Traversal(cappedDepth); + t.addNode(root); + // Reverse-lookup mode: walk in both directions from the root, but each recursion preserves + // its own direction (upstream stays upstream, downstream stays downstream) so unrelated + // siblings of intermediate pipelines don't leak in. + t.expandFromResource(root, cappedDepth, Direction.UPSTREAM); + t.expandFromResource(root, cappedDepth, Direction.DOWNSTREAM); + + return t.build(root); + } + + // ─── Helpers ───────────────────────────────────────────────────────────── + + private static int cap(int depth) { + return Math.max(depth, 0); + } + + private static boolean ownedBy(V1ObjectMeta meta, String ownerKind, String ownerName, String ownerUid) { + if (meta == null || meta.getOwnerReferences() == null) { + return false; + } + for (V1OwnerReference ref : meta.getOwnerReferences()) { + if (!ownerKind.equals(ref.getKind())) { + continue; + } + // UID match is strictest; fall back to name match (matches the dep-guard's relaxation). + if (ownerUid != null && ownerUid.equals(ref.getUid())) { + return true; + } + if (ownerName.equals(ref.getName())) { + return true; + } + } + return false; + } + + private GraphNode.External externalNode(String database, List path) { + return new GraphNode.External(database, path); + } + + private static Map tierMap(V1alpha1LogicalTableSpec spec) { + Map out = new LinkedHashMap<>(); + if (spec == null || spec.getTiers() == null) { + return out; + } + for (Map.Entry e : spec.getTiers().entrySet()) { + out.put(e.getKey(), e.getValue() == null ? null : e.getValue().getDatabase()); + } + return out; + } + + /** + * Pull the workhorse {@code kind:} out of the rendered job YAML inside + * {@code V1alpha1PipelineSpec.yaml}. The yaml stream interleaves data resources (KafkaTopic, + * VeniceStore, etc.) with the actual execution artifact. We pick the first kind whose name + * ends in {@code Job} — covers FlinkSessionJob, FlinkBeamSqlJob, BeamSqlJob, batch/v1 Job, + * etc. without having to maintain an allowlist. Returns null if no Job-suffixed kind is found. + */ + static String extractJobKind(String yaml) { + if (yaml == null || yaml.isEmpty()) { + return null; + } + Matcher m = Pattern.compile("(?m)^kind:\\s*(\\S*Job)\\s*$").matcher(yaml); + return m.find() ? m.group(1) : null; + } + + /** Cheap engine inference from the job kind + raw YAML. Returns null when uncertain. */ + static String inferEngine(String jobKind, String yaml) { + if (jobKind == null) { + return null; + } + String kindLower = jobKind.toLowerCase(); + boolean mentionsBeam = kindLower.contains("beam") + || (yaml != null && yaml.toLowerCase().contains("beam")); + if (mentionsBeam && kindLower.contains("flink")) { + return "Flink Beam"; + } + if (mentionsBeam) { + return "Beam"; + } + if (kindLower.contains("flink")) { + return "Flink"; + } + if (kindLower.contains("spark")) { + return "Spark"; + } + return null; + } + + private static GraphNode.Trigger triggerNode(V1alpha1TableTrigger trigger) { + String name = trigger.getMetadata() == null ? "" : trigger.getMetadata().getName(); + String schedule = trigger.getSpec() == null ? null : trigger.getSpec().getSchedule(); + boolean paused = trigger.getSpec() != null && Boolean.TRUE.equals(trigger.getSpec().getPaused()); + String yaml = trigger.getSpec() == null ? null : trigger.getSpec().getYaml(); + String containerName = extractFirstContainerName(yaml); + String jobTemplate = extractJobTemplateName(name, yaml); + return new GraphNode.Trigger(name, schedule, paused, jobTemplate, containerName); + } + + /** + * Derive the JobTemplate name from the rendered Job's {@code metadata.name}. The deployer + * builds it as {@code -} (both canonicalized), so we strip the + * trigger-name prefix to recover the template name. Returns null if the prefix doesn't match + * (the YAML doesn't follow the convention, or the name isn't present). + */ + static String extractJobTemplateName(String triggerName, String yaml) { + if (triggerName == null || yaml == null || yaml.isEmpty()) { + return null; + } + String jobName = extractMetadataName(yaml); + if (jobName == null) { + return null; + } + String prefix = triggerName + "-"; + if (!jobName.startsWith(prefix) || jobName.length() == prefix.length()) { + return null; + } + return jobName.substring(prefix.length()); + } + + /** Top-level {@code metadata.name:} value in a Kubernetes YAML doc; nullable. */ + static String extractMetadataName(String yaml) { + if (yaml == null || yaml.isEmpty()) { + return null; + } + int metadataIdx = yaml.indexOf("metadata:"); + if (metadataIdx < 0) { + return null; + } + Matcher m = Pattern.compile("(?m)^\\s+name:\\s*(\\S+)\\s*$") + .matcher(yaml.substring(metadataIdx)); + return m.find() ? m.group(1) : null; + } + + /** + * Attach trigger flow edges. Reads the {@code depends-on-sources} and {@code depends-on-sink} + * annotations and emits dotted edges in the data-flow direction: + * {@code source --(dotted)--> trigger --(dotted)--> sink}. If only one endpoint resolves to + * an External in the graph, only that edge is emitted (Shape A). + */ + private static void attachTriggerFlowEdges(PipelineGraphBuilder.Traversal t, + V1alpha1TableTrigger trigger, GraphNode.Trigger tNode) { + String sourceIdentifier = triggerSourceIdentifier(trigger); + if (sourceIdentifier != null) { + GraphNode.External source = t.findExternalByIdentifier(sourceIdentifier); + if (source != null) { + t.addEdge(new GraphEdge(source, tNode, GraphEdge.Type.TRIGGERS)); + } + } + String sinkIdentifier = triggerSinkIdentifier(trigger); + if (sinkIdentifier != null) { + GraphNode.External sink = t.findExternalByIdentifier(sinkIdentifier); + if (sink != null) { + t.addEdge(new GraphEdge(tNode, sink, GraphEdge.Type.TRIGGERS)); + } + } + } + + /** Pull the trigger's source identifier from the {@code depends-on-sources} annotation. */ + private static String triggerSourceIdentifier(V1alpha1TableTrigger trigger) { + String value = annotationValue(trigger, DependencyLabels.ANNOTATION_KEY_SOURCES); + if (value == null || value.isEmpty()) { + return null; + } + Set ids = DependencyLabels.parseAnnotation(value); + return ids.isEmpty() ? null : ids.iterator().next(); + } + + /** Pull the trigger's sink identifier from the {@code depends-on-sinks} annotation. Today a + * trigger has at most one sink, so we return the first; if/when triggers carry multiple sinks + * this needs to grow into an iteration. */ + private static String triggerSinkIdentifier(V1alpha1TableTrigger trigger) { + Set sinks = DependencyLabels.parseAnnotation( + annotationValue(trigger, DependencyLabels.ANNOTATION_KEY_SINKS)); + return sinks.isEmpty() ? null : sinks.iterator().next(); + } + + private static String annotationValue(V1alpha1TableTrigger trigger, String key) { + if (trigger.getMetadata() == null || trigger.getMetadata().getAnnotations() == null) { + return null; + } + return trigger.getMetadata().getAnnotations().get(key); + } + + /** First container's {@code name:} in the {@code containers:} list; nullable. */ + static String extractFirstContainerName(String yaml) { + if (yaml == null || yaml.isEmpty()) { + return null; + } + int containersIdx = yaml.indexOf("containers:"); + if (containersIdx < 0) { + return null; + } + Matcher m = Pattern.compile("(?m)^\\s*-\\s*name:\\s*(\\S+)\\s*$") + .matcher(yaml.substring(containersIdx)); + return m.find() ? m.group(1) : null; + } + + + // ─── Traversal state holder ────────────────────────────────────────────── + + /** + * Direction of recursive expansion through pipelines/triggers. + * + *

+ * + *

Direction stays sticky across one recursive walk, so unrelated siblings of an + * intermediate pipeline don't get pulled in via the opposite direction. + */ + enum Direction { UPSTREAM, DOWNSTREAM } + + private final class Traversal { + private final int maxDepth; + private final Map nodes = new LinkedHashMap<>(); + private final Set edges = new LinkedHashSet<>(); + private final Set expandedResources = new HashSet<>(); + private final Set expandedPipelines = new HashSet<>(); + + Traversal(int maxDepth) { + this.maxDepth = maxDepth; + } + + void addNode(GraphNode node) { + nodes.putIfAbsent(node.id(), node); + } + + void addEdge(GraphEdge edge) { + edges.add(edge); + } + + /** All nodes added so far, in insertion order. */ + Collection nodes() { + return nodes.values(); + } + + PipelineGraph build(GraphNode root) { + return new PipelineGraph(root, new LinkedHashSet<>(nodes.values()), edges); + } + + /** Find an {@link GraphNode.External} matching a {@code _} identifier. */ + GraphNode.External findExternalByIdentifier(String identifier) { + if (identifier == null) { + return null; + } + for (GraphNode n : nodes.values()) { + if (n instanceof GraphNode.External) { + GraphNode.External e = (GraphNode.External) n; + if (identifier.equals(DependencyLabels.identifier(e.database(), e.path()))) { + return e; + } + } + } + return null; + } + + + /** + * Fan out from an external resource in one direction at a time. {@link Direction#UPSTREAM} + * finds pipelines/triggers that *produce* {@code resource} (where it appears as the sink); + * {@link Direction#DOWNSTREAM} finds those that *consume* it (where it appears as a source). + * Recursion stays in the same direction — once you're walking upstream, an intermediate + * pipeline's siblings don't get pulled in via their downstream side. + */ + void expandFromResource(GraphNode.External resource, int remainingDepth, Direction direction) + throws SQLException { + String key = direction.name() + "/" + resource.database() + "/" + String.join(".", resource.path()); + if (!expandedResources.add(key) || remainingDepth <= 0) { + return; + } + String labelKey = DependencyLabels.labelKey(resource.database(), resource.path()); + String identifier = DependencyLabels.identifier(resource.database(), resource.path()); + + Collection matches = pipelineApi.select(labelKey); + for (V1alpha1Pipeline pipeline : matches) { + if (!annotationMentions(pipeline, identifier)) { + // Stale label or hash collision — ignore. + continue; + } + Set sources = parseAnnotation(pipeline, DependencyLabels.ANNOTATION_KEY_SOURCES); + Set sinks = parseAnnotation(pipeline, DependencyLabels.ANNOTATION_KEY_SINKS); + if (direction == Direction.UPSTREAM) { + // Only pipelines that *produce* resource (resource is one of the sinks). Pipelines + // that merely consume resource are downstream — skip them in upstream walks. + if (!sinks.contains(identifier)) { + continue; + } + } else { + // Only pipelines that *consume* resource (resource is one of the sources). + if (!sources.contains(identifier)) { + continue; + } + } + expandPipelineDirected(pipeline, direction, remainingDepth); + } + + // Triggers carry the same `depends-on-` labels. Same direction filter: + // UPSTREAM — trigger's sink == resource (trigger produces resource). + // DOWNSTREAM — trigger's source == resource (trigger consumes resource). + Collection triggerMatches = triggerApi.select(labelKey); + for (V1alpha1TableTrigger trigger : triggerMatches) { + String triggerSource = triggerSourceIdentifier(trigger); + String triggerSink = triggerSinkIdentifier(trigger); + if (!identifier.equals(triggerSource) && !identifier.equals(triggerSink)) { + // Annotation cross-check — skip stale labels and slug collisions. + continue; + } + if (direction == Direction.UPSTREAM && !identifier.equals(triggerSink)) { + continue; + } + if (direction == Direction.DOWNSTREAM && !identifier.equals(triggerSource)) { + continue; + } + GraphNode.Trigger tNode = triggerNode(trigger); + addNode(tNode); + if (direction == Direction.UPSTREAM) { + // Trigger produces resource; arrow points into resource. + addEdge(new GraphEdge(tNode, resource, GraphEdge.Type.TRIGGERS)); + // Walk further upstream from the trigger's source (if any) — that's what feeds + // the trigger. + if (triggerSource != null) { + GraphNode.External srcExt = externalFromIdentifier(triggerSource); + addNode(srcExt); + addEdge(new GraphEdge(srcExt, tNode, GraphEdge.Type.TRIGGERS)); + expandFromResource(srcExt, remainingDepth - 1, Direction.UPSTREAM); + } + } else { + // Trigger consumes resource; arrow points away from resource. + addEdge(new GraphEdge(resource, tNode, GraphEdge.Type.TRIGGERS)); + if (triggerSink != null) { + GraphNode.External sinkExt = externalFromIdentifier(triggerSink); + addNode(sinkExt); + addEdge(new GraphEdge(tNode, sinkExt, GraphEdge.Type.TRIGGERS)); + expandFromResource(sinkExt, remainingDepth - 1, Direction.DOWNSTREAM); + } + } + } + } + + /** + * Ingest a Pipeline CRD into the graph. Reads source/sink annotations, creates edges, + * and schedules transitive expansion from the other endpoints. Returns the Pipeline node + * so the caller can attach owner-of edges. + */ + GraphNode.Pipeline expandPipeline(V1alpha1Pipeline pipeline, GraphNode rootContext) throws SQLException { + String pipeId = pipelineId(pipeline); + if (pipeId == null || !expandedPipelines.add(pipeId)) { + return null; + } + GraphNode.Pipeline pipeNode = pipelineNode(pipeline); + addNode(pipeNode); + + Set sources = parseAnnotation(pipeline, DependencyLabels.ANNOTATION_KEY_SOURCES); + Set sinks = parseAnnotation(pipeline, DependencyLabels.ANNOTATION_KEY_SINKS); + + for (String id : sources) { + GraphNode.External ext = externalFromIdentifier(id); + addNode(ext); + addEdge(new GraphEdge(ext, pipeNode, GraphEdge.Type.DEPENDS_ON_SOURCE)); + } + for (String id : sinks) { + GraphNode.External ext = externalFromIdentifier(id); + addNode(ext); + addEdge(new GraphEdge(pipeNode, ext, GraphEdge.Type.DEPENDS_ON_SINK)); + } + return pipeNode; + } + + /** + * Ingest a pipeline and recursively expand in one direction. The pipeline node is added with + * all its source/sink edges (so the user always sees the pipeline's full shape), but the + * recursion only walks further in {@code direction}: + * + *

+ */ + GraphNode.Pipeline expandPipelineDirected(V1alpha1Pipeline pipeline, Direction direction, + int remainingDepth) throws SQLException { + GraphNode.Pipeline pipeNode = expandPipeline(pipeline, null); + if (pipeNode == null) { + return null; + } + if (remainingDepth <= 0) { + return pipeNode; + } + Set sources = parseAnnotation(pipeline, DependencyLabels.ANNOTATION_KEY_SOURCES); + Set sinks = parseAnnotation(pipeline, DependencyLabels.ANNOTATION_KEY_SINKS); + if (direction == Direction.UPSTREAM) { + for (String id : sources) { + GraphNode.External srcExt = externalFromIdentifier(id); + expandFromResource(srcExt, remainingDepth - 1, Direction.UPSTREAM); + } + } else { + for (String id : sinks) { + GraphNode.External sinkExt = externalFromIdentifier(id); + expandFromResource(sinkExt, remainingDepth - 1, Direction.DOWNSTREAM); + } + } + return pipeNode; + } + + private GraphNode.External externalFromIdentifier(String identifier) throws SQLException { + // Identifier shape: _. Split on the first underscore. + int idx = identifier.indexOf('_'); + if (idx < 0) { + return externalNode(identifier, Collections.emptyList()); + } + String database = identifier.substring(0, idx); + String pathStr = identifier.substring(idx + 1); + List path = pathStr.isEmpty() ? Collections.emptyList() + : Arrays.asList(pathStr.split("\\.")); + return externalNode(database, path); + } + + private GraphNode.Pipeline pipelineNode(V1alpha1Pipeline pipeline) { + String name = pipeline.getMetadata() == null ? "" : pipeline.getMetadata().getName(); + String yaml = pipeline.getSpec() == null ? null : pipeline.getSpec().getYaml(); + String jobKind = extractJobKind(yaml); + String engine = inferEngine(jobKind, yaml); + return new GraphNode.Pipeline(name, jobKind, engine); + } + + private String pipelineId(V1alpha1Pipeline pipeline) { + V1ObjectMeta meta = pipeline.getMetadata(); + if (meta == null) { + return null; + } + return meta.getNamespace() + "/" + meta.getName(); + } + + private Set parseAnnotation(V1alpha1Pipeline pipeline, String key) { + String value = annotationValue(pipeline, key); + return value == null ? Collections.emptySet() : DependencyLabels.parseAnnotation(value); + } + + private String annotationValue(V1alpha1Pipeline pipeline, String key) { + V1ObjectMeta meta = pipeline.getMetadata(); + if (meta == null || meta.getAnnotations() == null) { + return null; + } + return meta.getAnnotations().get(key); + } + + private boolean annotationMentions(V1alpha1Pipeline pipeline, String identifier) { + // Confirm the label-selector match is real — the resource must appear in either the + // sources or sinks annotation. Used to filter out hash collisions and stale labels. + Set sources = parseAnnotation(pipeline, DependencyLabels.ANNOTATION_KEY_SOURCES); + if (sources.contains(identifier)) { + return true; + } + Set sinks = parseAnnotation(pipeline, DependencyLabels.ANNOTATION_KEY_SINKS); + return sinks.contains(identifier); + } + } +} diff --git a/hoptimator-k8s/src/main/resources/META-INF/services/com.linkedin.hoptimator.graph.GraphProvider b/hoptimator-k8s/src/main/resources/META-INF/services/com.linkedin.hoptimator.graph.GraphProvider new file mode 100644 index 00000000..96127e33 --- /dev/null +++ b/hoptimator-k8s/src/main/resources/META-INF/services/com.linkedin.hoptimator.graph.GraphProvider @@ -0,0 +1 @@ +com.linkedin.hoptimator.k8s.K8sGraphProvider diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sGraphProviderTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sGraphProviderTest.java new file mode 100644 index 00000000..ce5ed6af --- /dev/null +++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sGraphProviderTest.java @@ -0,0 +1,85 @@ +package com.linkedin.hoptimator.k8s; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.linkedin.hoptimator.graph.GraphTarget; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + + +/** + * Dispatch tests for {@link K8sGraphProvider}. By the time the provider runs, the resolver in + * {@link com.linkedin.hoptimator.jdbc.GraphService} has already turned the user identifier into + * a typed {@link GraphTarget}; the provider's only job is to route each subtype to the right + * {@link PipelineGraphBuilder} entry point and to advertise what it supports. + */ +@ExtendWith(MockitoExtension.class) +class K8sGraphProviderTest { + + @Mock + private MockedStatic contextStatic; + @Mock + private Connection connection; + @Mock + private K8sContext context; + + /** Subclass that hands out a single mock builder so we can verify which entry point fires. */ + private static K8sGraphProvider providerWith(PipelineGraphBuilder builder) { + return new K8sGraphProvider() { + @Override + PipelineGraphBuilder createBuilder(K8sContext context) { + return builder; + } + }; + } + + @Test + void supportsAcceptsAllThreeGraphTargetSubtypes() { + K8sGraphProvider provider = new K8sGraphProvider(); + assertTrue(provider.supports(new GraphTarget.View("audience"))); + assertTrue(provider.supports(new GraphTarget.LogicalTable("members"))); + assertTrue(provider.supports(new GraphTarget.Resource("kafka", List.of("topic")))); + } + + @Test + void forTargetRoutesViewToBuilderForView() throws SQLException { + PipelineGraphBuilder builder = mock(PipelineGraphBuilder.class); + contextStatic.when(() -> K8sContext.create(connection)).thenReturn(context); + + providerWith(builder).forTarget(new GraphTarget.View("audience"), 3, connection); + + verify(builder).forView(eq("audience"), eq(3)); + } + + @Test + void forTargetRoutesLogicalTableToBuilderForLogicalTable() throws SQLException { + PipelineGraphBuilder builder = mock(PipelineGraphBuilder.class); + contextStatic.when(() -> K8sContext.create(connection)).thenReturn(context); + + providerWith(builder).forTarget(new GraphTarget.LogicalTable("members"), 2, connection); + + verify(builder).forLogicalTable(eq("members"), eq(2)); + } + + @Test + void forTargetRoutesResourceToBuilderForResource() throws SQLException { + PipelineGraphBuilder builder = mock(PipelineGraphBuilder.class); + contextStatic.when(() -> K8sContext.create(connection)).thenReturn(context); + + providerWith(builder).forTarget( + new GraphTarget.Resource("ads-database", List.of("ADS", "AD_CLICKS")), 1, connection); + + verify(builder).forResource(eq("ads-database"), eq(List.of("ADS", "AD_CLICKS")), eq(1)); + } +} diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/PipelineGraphBuilderTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/PipelineGraphBuilderTest.java new file mode 100644 index 00000000..d34c959d --- /dev/null +++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/PipelineGraphBuilderTest.java @@ -0,0 +1,641 @@ +package com.linkedin.hoptimator.k8s; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.openapi.models.V1OwnerReference; + +import org.junit.jupiter.api.Test; + +import com.linkedin.hoptimator.graph.GraphEdge; +import com.linkedin.hoptimator.graph.GraphNode; +import com.linkedin.hoptimator.graph.PipelineGraph; +import com.linkedin.hoptimator.k8s.models.V1alpha1LogicalTable; +import com.linkedin.hoptimator.k8s.models.V1alpha1LogicalTableList; +import com.linkedin.hoptimator.k8s.models.V1alpha1LogicalTableSpec; +import com.linkedin.hoptimator.k8s.models.V1alpha1LogicalTableSpecTiers; +import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline; +import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList; +import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineSpec; +import com.linkedin.hoptimator.k8s.models.V1alpha1TableTrigger; +import com.linkedin.hoptimator.k8s.models.V1alpha1TableTriggerList; +import com.linkedin.hoptimator.k8s.models.V1alpha1TableTriggerSpec; +import com.linkedin.hoptimator.k8s.models.V1alpha1View; +import com.linkedin.hoptimator.k8s.models.V1alpha1ViewList; +import com.linkedin.hoptimator.k8s.models.V1alpha1ViewSpec; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +/** + * End-to-end tests for {@link PipelineGraphBuilder}. Each test seeds a small fixture of CRDs + * via {@link FakeK8sApi} and exercises one of the three entry points. Assertions probe graph + * structure (node kinds, edge types, ownership wiring) rather than rendered output, which is + * tested separately in {@code MermaidRendererTest}. + */ +class PipelineGraphBuilderTest { + + // ─── Test 1: simple MV with two K8s sources + external sink ────────────── + + @Test + void forViewMaterializedRendersSourcesAndSink() throws SQLException { + V1alpha1View view = view("ads", "audience", true, "view-uid-1"); + V1alpha1Pipeline pipeline = pipelineWithSourcesAndSink( + "audience-pipeline", "view-uid-1", "View", + Arrays.asList("kafka1_KAFKA.events", "venice-prod_VENICE.profile"), + "ads_VENICE.audience"); + + PipelineGraphBuilder builder = builder( + list(view), list(pipeline), list(), list()); + + PipelineGraph graph = builder.forView("audience", 1); + + assertEquals(GraphNode.Kind.VIEW, graph.root().kind()); + // Expected nodes: View root + 1 Pipeline + 2 sources + 1 sink = 5 nodes. + assertEquals(5, graph.nodes().size(), "graph should have view + pipeline + 2 sources + sink"); + + GraphNode pipelineNode = graphNodeByKind(graph, GraphNode.Kind.PIPELINE); + assertNotNull(pipelineNode); + assertTrue(graph.edges().stream().anyMatch(e -> + e.type() == GraphEdge.Type.OWNER_OF + && e.from().equals(graph.root()) + && e.to().equals(pipelineNode)), + "View should own the realizing pipeline"); + + long sourceEdges = graph.edges().stream() + .filter(e -> e.type() == GraphEdge.Type.DEPENDS_ON_SOURCE).count(); + assertEquals(2, sourceEdges, "two source edges expected"); + + long sinkEdges = graph.edges().stream() + .filter(e -> e.type() == GraphEdge.Type.DEPENDS_ON_SINK).count(); + assertEquals(1, sinkEdges, "one sink edge expected"); + } + + // ─── Test 2: MV-on-MV — Calcite inlining, no special-case handling ──────── + + @Test + void forViewMvOnMvRendersOnlyPhysicalSources() throws SQLException { + // CREATE MV A AS SELECT ... FROM kafka.x + // CREATE MV B AS SELECT * FROM A + // After Calcite inlining, B's pipeline reads kafka.x directly. The graph for B should NOT + // mention A — visualization tracks operational truth, not user intent. + V1alpha1View viewB = view("ns", "b", true, "uid-B"); + V1alpha1Pipeline pipeB = pipelineWithSourcesAndSink( + "B-pipeline", "uid-B", "View", + Arrays.asList("kafka1_KAFKA.x"), + "ns_VENICE.B"); + + PipelineGraphBuilder builder = builder( + list(view("ns", "a", true, "uid-A"), viewB), + list(pipeB), + list(), list()); + + PipelineGraph graph = builder.forView("b", 1); + + boolean mentionsA = graph.nodes().stream().anyMatch(n -> n.id().endsWith("/a")); + assertFalse(mentionsA, "MV-on-MV should not surface upstream View 'A' — only physical sources"); + assertTrue(graph.nodes().stream().anyMatch(n -> + n instanceof GraphNode.External && ((GraphNode.External) n).database().equals("kafka1")), + "MV-on-MV should surface inlined kafka source"); + } + + // ─── Test 2b: forView is scoped to the view's neighborhood ──────────────── + + @Test + void forViewIsScopedToTheViewNeighborhood() throws SQLException { + // Mirrors the real-world bug. Two MVs share a table as their connection point: + // MV "P" reads from ads.ad_clicks and writes to ads.page_views. + // MV "A" reads from ads.page_views (and profile.members) and writes to ads.audience. + // forView on A should show A's own pipeline + sources + sink, nothing else: + // - No P-pipeline (upstream — chain belongs to Resource targets via !graph on the source). + // - No ads.ad_clicks (P's source, not A's direct neighbor). + // forView on P should show P's own pipeline + sources + sink, nothing else: + // - No A-pipeline (downstream of P's sink). + // - No profile.members / ads.audience. + V1alpha1View viewP = view("ads", "p", true, "uid-P"); + V1alpha1Pipeline pPipe = pipelineWithSourcesAndSink( + "p-pipeline", "uid-P", "View", + Arrays.asList("ads-database_ADS.AD_CLICKS"), + "ads-database_ADS.PAGE_VIEWS"); + + V1alpha1View viewA = view("ads", "a", true, "uid-A"); + V1alpha1Pipeline aPipe = pipelineWithSourcesAndSink( + "a-pipeline", "uid-A", "View", + Arrays.asList("ads-database_ADS.PAGE_VIEWS", "profile-database_PROFILE.MEMBERS"), + "ads-database_ADS.AUDIENCE"); + + PipelineGraphBuilder builder = builder( + list(viewP, viewA), + list(pPipe, aPipe), + list(), list()); + + // forView P — must NOT include downstream A or its surroundings. + PipelineGraph forP = builder.forView("p", 2); + boolean pHasOwn = forP.nodes().stream().anyMatch(n -> + n instanceof GraphNode.Pipeline && "p-pipeline".equals(((GraphNode.Pipeline) n).name())); + boolean pHasA = forP.nodes().stream().anyMatch(n -> + n instanceof GraphNode.Pipeline && "a-pipeline".equals(((GraphNode.Pipeline) n).name())); + assertTrue(pHasOwn, "P's own pipeline should appear in its graph"); + assertFalse(pHasA, "A's pipeline is downstream of P's sink — must not appear"); + assertFalse(forP.nodes().stream().anyMatch(n -> + n instanceof GraphNode.External + && "profile-database".equals(((GraphNode.External) n).database())), + "profile.members only feeds the downstream MV — must not appear"); + + // forView A — must NOT include upstream P or ads.ad_clicks. The chain view is the job of + // a Resource-target query (!graph on the source identifier). A's graph is A's pipeline + + // its direct sources (PAGE_VIEWS, MEMBERS as leaves) + its sink (AUDIENCE as a leaf). + PipelineGraph forA = builder.forView("a", 2); + boolean aHasOwn = forA.nodes().stream().anyMatch(n -> + n instanceof GraphNode.Pipeline && "a-pipeline".equals(((GraphNode.Pipeline) n).name())); + boolean aHasP = forA.nodes().stream().anyMatch(n -> + n instanceof GraphNode.Pipeline && "p-pipeline".equals(((GraphNode.Pipeline) n).name())); + assertTrue(aHasOwn, "A's own pipeline should appear in its graph"); + assertFalse(aHasP, "P's pipeline is upstream of A's source — must not chain into A's view graph"); + assertFalse(forA.nodes().stream().anyMatch(n -> + n instanceof GraphNode.External + && ((GraphNode.External) n).path().contains("AD_CLICKS")), + "ads.ad_clicks is upstream of A's source PAGE_VIEWS — must not chain into A's view graph"); + } + + // ─── Test 3: LogicalTable with three tiers + offline trigger ───────────── + + @Test + void forLogicalTableRendersTiersAndOwnedPipelinesAndTrigger() throws SQLException { + V1alpha1LogicalTable lt = logicalTable("ns", "events", "uid-lt", linked( + "nearline", "kafka-db", + "online", "venice-db", + "offline", "hdfs-db")); + + // Implicit nearline → online pipeline owned by the LogicalTable. + V1alpha1Pipeline n2o = pipelineWithSourcesAndSink( + "events-nearline-online", "uid-lt", "LogicalTable", + Arrays.asList("kafka-db_kafka-db.events"), + "venice-db_venice-db.events"); + V1alpha1Pipeline n2f = pipelineWithSourcesAndSink( + "events-nearline-offline", "uid-lt", "LogicalTable", + Arrays.asList("kafka-db_kafka-db.events"), + "hdfs-db_hdfs-db.events"); + // Bridging trigger: reads from offline (source) and writes to online (sink), reverse-ETL. + V1alpha1TableTrigger trigger = trigger("ns", "events-offline-trigger", + "uid-lt", "0 */6 * * *", + "hdfs-db_hdfs-db.events", // source (offline) + "venice-db_venice-db.events"); // sink (online) + + PipelineGraphBuilder builder = builder( + list(), + list(n2o, n2f), + list(lt), + list(trigger)); + + PipelineGraph graph = builder.forLogicalTable("events", 1); + + assertEquals(GraphNode.Kind.LOGICAL_TABLE, graph.root().kind()); + long pipelineCount = graph.nodes().stream() + .filter(n -> n.kind() == GraphNode.Kind.PIPELINE).count(); + assertEquals(2, pipelineCount, "two implicit pipelines expected"); + + long triggerCount = graph.nodes().stream() + .filter(n -> n.kind() == GraphNode.Kind.TRIGGER).count(); + assertEquals(1, triggerCount, "offline trigger expected"); + + long triggerEdges = graph.edges().stream() + .filter(e -> e.type() == GraphEdge.Type.TRIGGERS).count(); + assertEquals(2, triggerEdges, + "bridging trigger should produce two TRIGGERS edges: source→trigger and trigger→sink"); + + // LT now owns whatever externals match a tier database, plus the 2 pipelines + 1 trigger. + // Three tier-physical externals come from the pipelines (one per tier db) plus the 2 + // pipelines + 1 trigger = 6 OWNER_OF edges. + long ownerEdges = graph.edges().stream() + .filter(e -> e.type() == GraphEdge.Type.OWNER_OF + && e.from().equals(graph.root())).count(); + assertEquals(6, ownerEdges, + "LogicalTable should own its tier externals + 2 pipelines + 1 trigger; got: " + ownerEdges); + } + + // ─── Test: multiple triggers on a logical table — both are surfaced ────── + + @Test + void forLogicalTableSurfacesMultipleTriggers() throws SQLException { + V1alpha1LogicalTable lt = logicalTable("ns", "events", "uid-lt", linked( + "nearline", "kafka-db", + "offline", "hdfs-db")); + + // Both triggers reference the same offline-tier identifier so the graph connects both. + // We add a pipeline writing to that identifier so the External actually exists in the graph + // (otherwise the trigger has nothing to connect to). + V1alpha1Pipeline writer = pipelineWithSourcesAndSink( + "events-nearline-offline", "uid-lt", "LogicalTable", + Arrays.asList("kafka-db_kafka-db.events"), + "hdfs-db_hdfs-db.events"); + V1alpha1TableTrigger fast = trigger("ns", "events-fast", "uid-lt", + "*/5 * * * *", "hdfs-db_hdfs-db.events", null); + V1alpha1TableTrigger slow = trigger("ns", "events-slow", "uid-lt", + "0 0 * * *", "hdfs-db_hdfs-db.events", null); + + PipelineGraphBuilder builder = builder( + list(), list(writer), list(lt), list(fast, slow)); + + PipelineGraph graph = builder.forLogicalTable("events", 0); + + long triggerNodes = graph.nodes().stream() + .filter(n -> n.kind() == GraphNode.Kind.TRIGGER).count(); + assertEquals(2, triggerNodes, "both triggers should be in the graph"); + + long triggerEdges = graph.edges().stream() + .filter(e -> e.type() == GraphEdge.Type.TRIGGERS).count(); + assertEquals(2, triggerEdges, "both TRIGGERS edges should exist"); + } + + // ─── Test: trigger pointing at an arbitrary table (not LogicalTable-owned) ─ + + @Test + void forResourceTraversesIntoTriggerTarget() throws SQLException { + // A standalone trigger that fires a pipeline writing to an arbitrary external table. + // forResource on that table should still surface the connected pipeline; the trigger itself + // doesn't live in this graph (it's not owned by the resource), but the pipeline does. + V1alpha1Pipeline pipeline = pipelineWithSourcesAndSink( + "etl-pipeline", "uid-job", "Job", + Arrays.asList("source-db_S.input"), + "sink-db_S.output"); + + PipelineGraphBuilder builder = builder( + list(), list(pipeline), list(), list()); + + PipelineGraph graph = builder.forResource("sink-db", Arrays.asList("S", "output"), 1); + + long pipelineCount = graph.nodes().stream() + .filter(n -> n.kind() == GraphNode.Kind.PIPELINE).count(); + assertEquals(1, pipelineCount, "reverse lookup should surface the writer pipeline"); + + boolean hasSinkEdge = graph.edges().stream() + .anyMatch(e -> e.type() == GraphEdge.Type.DEPENDS_ON_SINK); + assertTrue(hasSinkEdge, "writer pipeline should connect to sink via DEPENDS_ON_SINK"); + } + + // ─── Test: depth=0 stops at the root only ───────────────────────────────── + + @Test + void depthZeroForResourceReturnsRootOnly() throws SQLException { + V1alpha1Pipeline pipeline = pipelineWithSourcesAndSink( + "p1", "uid", "Job", + Arrays.asList("kafka1_KAFKA.events"), + "ads_VENICE.audience"); + + PipelineGraphBuilder builder = builder( + list(), list(pipeline), list(), list()); + + PipelineGraph graph = builder.forResource("kafka1", Arrays.asList("KAFKA", "events"), 0); + + assertEquals(1, graph.nodes().size(), + "depth=0 should yield only the root resource"); + } + + // ─── Test: cycles in the pipeline graph terminate cleanly ──────────────── + + @Test + void forResourceTerminatesOnCycle() throws SQLException { + // Cycle: P1 reads T1 / writes T2; P2 reads T2 / writes T1. Same resource appears as both + // source and sink along the chain, which would loop forever without the Traversal's + // visited-set guard. Pin that the build still terminates and the rendered graph captures + // the loop as a closed cycle (4 nodes, 4 edges). + V1alpha1Pipeline p1 = pipelineWithSourcesAndSink( + "p1", "uid-job-1", "Job", + Arrays.asList("db_A.t1"), + "db_A.t2"); + V1alpha1Pipeline p2 = pipelineWithSourcesAndSink( + "p2", "uid-job-2", "Job", + Arrays.asList("db_A.t2"), + "db_A.t1"); + + PipelineGraphBuilder builder = builder( + list(), list(p1, p2), list(), list()); + + // Generous depth to make sure termination comes from the visited-set, not the cap. + PipelineGraph graph = builder.forResource("db", Arrays.asList("A", "t1"), 10); + + // Exactly 4 nodes: T1 (root), P1, T2, P2 — no duplicate node identities. + assertEquals(4, graph.nodes().size(), + "cycle should produce 4 distinct nodes (T1, P1, T2, P2), not infinite expansion"); + + // Exactly 4 edges in the loop: + // T1 → P1 (DEPENDS_ON_SOURCE) + // P1 → T2 (DEPENDS_ON_SINK) + // T2 → P2 (DEPENDS_ON_SOURCE) + // P2 → T1 (DEPENDS_ON_SINK) + assertEquals(4, graph.edges().size(), + "cycle should produce 4 distinct edges, no duplicates from re-expansion"); + + // The second arc closes the loop — P2's sink points back at the root. + boolean cycleCloses = graph.edges().stream() + .anyMatch(e -> e.type() == GraphEdge.Type.DEPENDS_ON_SINK + && e.to().equals(graph.root())); + assertTrue(cycleCloses, "an edge should point back at the root to close the cycle"); + } + + // ─── Test: large depth values are accepted ──────────────────────────────── + + @Test + void largeDepthHandledGracefully() throws SQLException { + V1alpha1Pipeline pipeline = pipelineWithSourcesAndSink( + "p1", "uid", "Job", + Arrays.asList("kafka1_KAFKA.events"), + "ads_VENICE.audience"); + + PipelineGraphBuilder builder = builder( + list(), list(pipeline), list(), list()); + + // Depth 999: in practice graphs don't get that deep, so recursion terminates anyway. + PipelineGraph graph = builder.forResource("kafka1", Arrays.asList("KAFKA", "events"), 999); + + assertNotNull(graph); + assertTrue(graph.nodes().size() >= 2, "large depth should still discover at least the pipeline"); + } + + // ─── Test: forView accepts SQL-style identifiers and canonicalizes them ── + + @Test + void forViewCanonicalizesSqlStyleIdentifier() throws SQLException { + // CRD is stored as `venice-test-store-insert-partial`. The user passes the SQL-side identifier + // they typed in their CREATE statement, which has uppercase, dots, and a `$`. + V1alpha1View view = view("ns", "venice-test-store-insert-partial", true, "uid-v"); + + PipelineGraphBuilder builder = builder( + list(view), list(), list(), list()); + + PipelineGraph graph = builder.forView("VENICE.test-store$insert-partial", 0); + + assertEquals(GraphNode.Kind.VIEW, graph.root().kind()); + assertEquals("venice-test-store-insert-partial", ((GraphNode.View) graph.root()).name(), + "root name should be the canonicalized form"); + } + + // ─── Test: extractJobKind matches only kinds ending in "Job" ───────────── + + @Test + void extractJobKindReturnsTheJobSuffixedKind() { + String yaml = "" + + "apiVersion: hoptimator.linkedin.com/v1alpha1\n" + + "kind: KafkaTopic\n" + + "metadata:\n name: foo\n" + + "---\n" + + "apiVersion: flink.apache.org/v1beta1\n" + + "kind: FlinkSessionJob\n" + + "metadata:\n name: foo-job\n"; + + assertEquals("FlinkSessionJob", PipelineGraphBuilder.extractJobKind(yaml), + "should pick the kind whose name ends in 'Job', not any sibling resource"); + } + + @Test + void extractJobKindReturnsNullWhenNoJobSuffixedKindPresent() { + String yaml = "kind: KafkaTopic\n---\nkind: VeniceStore\n---\nkind: ConfigMap\n"; + + assertNull(PipelineGraphBuilder.extractJobKind(yaml), + "no Job-suffixed kind → null (we don't fabricate a job)"); + } + + @Test + void inferEngineNullJobKindReturnsNull() { + assertNull(PipelineGraphBuilder.inferEngine(null, "yaml content")); + } + + @Test + void inferEngineSparkJobReturnsSpark() { + assertEquals("Spark", PipelineGraphBuilder.inferEngine("SparkApplication", null)); + } + + @Test + void inferEngineBeamOnFlinkReturnsFlinkBeam() { + // jobKind hints at Flink, yaml mentions Beam — engine is Flink Beam (Beam pipelines running + // on a Flink runner). + assertEquals("Flink Beam", PipelineGraphBuilder.inferEngine("FlinkSessionJob", + "containers:\n - name: beam-worker\n")); + } + + @Test + void inferEngineBeamOnlyJobReturnsBeam() { + // Beam without a Flink runner declared — fall through to plain Beam. + assertEquals("Beam", PipelineGraphBuilder.inferEngine("BeamApplication", null)); + } + + @Test + void inferEngineUnknownEngineReturnsNull() { + // No flink/beam/spark keyword anywhere — caller renders without an engine line. + assertNull(PipelineGraphBuilder.inferEngine("CustomKind", null)); + } + + @Test + void extractJobKindHandlesPlainBatchJob() { + String yaml = "" + + "apiVersion: batch/v1\n" + + "kind: Job\n" + + "metadata:\n name: cron-job\n"; + + assertEquals("Job", PipelineGraphBuilder.extractJobKind(yaml)); + } + + @Test + void extractJobTemplateNameStripsTriggerPrefix() { + // Deployer renders `metadata.name: -