From 357968c866a651df64d671892af951252fd38262 Mon Sep 17 00:00:00 2001 From: himmi-01 Date: Sat, 23 May 2026 13:28:00 -0700 Subject: [PATCH] feat(graph): add CLI export command to output service graph as Mermaid or JSON --- corbell/cli/commands/graph.py | 26 +++++ corbell/core/graph/schema.py | 10 ++ corbell/core/graph/sqlite_store.py | 154 +++++++++++++++++++++++++++++ tests/test_graph_export.py | 129 ++++++++++++++++++++++++ 4 files changed, 319 insertions(+) create mode 100644 tests/test_graph_export.py diff --git a/corbell/cli/commands/graph.py b/corbell/cli/commands/graph.py index dc64879..f1e9b33 100644 --- a/corbell/cli/commands/graph.py +++ b/corbell/cli/commands/graph.py @@ -206,3 +206,29 @@ def graph_callpath( console.print(f"[green]Found {len(paths)} path(s):[/green]") for i, path in enumerate(paths, 1): console.print(f"\n Path {i}: {' → '.join(path)}") + + +@app.command("export") +def graph_export( + workspace: Optional[Path] = typer.Option(None, "--workspace", "-w", help="Workspace directory."), + output: Optional[Path] = typer.Option(None, "--output", "-o", help="Output file path (default: stdout)."), + format: str = typer.Option("mermaid", "--format", "-f", help="Output format: mermaid | json"), +): + """Export the service graph to Mermaid or JSON format.""" + cfg, config_dir = _load(workspace) + store = _get_store(cfg, config_dir) + + if format == "mermaid": + content = store.to_mermaid() + elif format == "json": + content = store.to_json() + else: + console.print(f"[red]Unsupported format: {format}. Must be 'mermaid' or 'json'.[/red]") + raise typer.Exit(1) + + if output: + output.write_text(content, encoding="utf-8") + console.print(f"[green]✓ Exported service graph to {output}[/green]") + else: + print(content) + diff --git a/corbell/core/graph/schema.py b/corbell/core/graph/schema.py index 951cfa1..c57ab94 100644 --- a/corbell/core/graph/schema.py +++ b/corbell/core/graph/schema.py @@ -159,6 +159,16 @@ def get_all_nodes_summary(self) -> Dict[str, Any]: """Return a summary of all nodes and edges for display.""" ... + @abstractmethod + def to_mermaid(self) -> str: + """Return a Mermaid representation of the graph.""" + ... + + @abstractmethod + def to_json(self) -> str: + """Return the JSON representation of the service graph.""" + ... + @abstractmethod def clear(self) -> None: """Remove all data from the store.""" diff --git a/corbell/core/graph/sqlite_store.py b/corbell/core/graph/sqlite_store.py index 49497a2..fa63665 100644 --- a/corbell/core/graph/sqlite_store.py +++ b/corbell/core/graph/sqlite_store.py @@ -295,6 +295,160 @@ def get_all_nodes_summary(self) -> Dict[str, Any]: "edges": edge_count, } + def to_mermaid(self) -> str: + """Return a single Mermaid file string describing the system boundaries.""" + with self._conn() as conn: + rows = conn.execute("SELECT id, node_type, data FROM graph_nodes").fetchall() + services, datastores, queues = [], [], [] + + for row in rows: + ntype = row["node_type"] + data = json.loads(row["data"]) + nid = row["id"].replace("-", "_").replace(".", "_").replace(":", "_") + label = data.get("name", row["id"]).replace('"', "'") + if ntype == "service": + services.append({"id": nid, "label": label}) + elif ntype == "datastore": + datastores.append({"id": nid, "label": label}) + elif ntype == "queue": + queues.append({"id": nid, "label": label}) + + edges = conn.execute("SELECT source_id, target_id, kind FROM graph_edges").fetchall() + connections = [] + seen = set() + for e in edges: + kind = e["kind"] + if kind in ("method_call", "flow_step", "git_coupling", "flow_link"): + continue + src = e["source_id"].replace("-", "_").replace(".", "_").replace(":", "_") + tgt = e["target_id"].replace("-", "_").replace(".", "_").replace(":", "_") + if (src, tgt, kind) in seen: + continue + seen.add((src, tgt, kind)) + + if kind == "http_call": + connections.append(f" {src} -- HTTP --> {tgt}") + elif kind == "rpc_call": + connections.append(f" {src} -- RPC/Edge Function --> {tgt}") + elif kind == "db_read": + connections.append(f" {src} -- Reads --> {tgt}") + elif kind == "db_write": + connections.append(f" {src} -- Writes --> {tgt}") + elif kind == "queue_publish": + connections.append(f" {src} -- Publishes --> {tgt}") + elif kind == "queue_consume": + connections.append(f" {src} -- Consumes --> {tgt}") + elif kind == "library_dependency": + connections.append(f" {src} -. Import/Library .-> {tgt}") + else: + connections.append(f" {src} --> {tgt}") + + lines = ["graph LR"] + lines.append(" %% Services") + for s in services: + lines.append(f' {s["id"]}["{s["label"]}"]') + + if datastores: + lines.append(" %% Data Stores") + for d in datastores: + lines.append(f' {d["id"]}[("{d["label"]}")]') + + if queues: + lines.append(" %% Queues") + for q in queues: + lines.append(f' {q["id"]}>"{q["label"]}"]') + + lines.append(" %% Edges") + lines.extend(connections) + + lines.append(" %% Styling") + lines.append(" classDef service fill:#161b22,stroke:#39d353,stroke-width:2px,color:#c9d1d9;") + lines.append(" classDef datastore fill:#161b22,stroke:#ffa657,stroke-width:2px,color:#c9d1d9;") + lines.append(" classDef queue fill:#161b22,stroke:#bc8cff,stroke-width:2px,color:#c9d1d9;") + + for s in services: + lines.append(f' class {s["id"]} service') + for d in datastores: + lines.append(f' class {d["id"]} datastore') + for q in queues: + lines.append(f' class {q["id"]} queue') + + return "\n".join(lines) + + def to_json(self) -> str: + """Return the JSON representation of the service graph.""" + nodes = [] + edges = [] + with self._conn() as conn: + rows = conn.execute("SELECT id, node_type, data FROM graph_nodes").fetchall() + for row in rows: + ntype = row["node_type"] + data = json.loads(row["data"]) + node = {"id": row["id"], "type": ntype} + if ntype == "service": + node.update({ + "label": data.get("name", row["id"]), + "language": data.get("language", ""), + "service_type": data.get("service_type", "api"), + "tags": data.get("tags", []), + }) + elif ntype == "datastore": + node.update({"label": data.get("name", row["id"]), "kind": data.get("kind", "")}) + elif ntype == "queue": + node.update({"label": data.get("name", row["id"]), "kind": data.get("kind", "")}) + elif ntype == "flow": + svc_id = data.get("service_id", "") + node.update({ + "label": data.get("name", row["id"]), + "service_id": svc_id, + "step_count": data.get("step_count", 0), + }) + edges.append({ + "source": svc_id, + "target": row["id"], + "kind": "flow_link", + "meta": {} + }) + elif ntype == "method": + continue # don't clutter service-level graph + nodes.append(node) + + # Count methods per service for node sizing + method_counts: Dict[str, int] = {} + mcounts = conn.execute( + "SELECT data FROM graph_nodes WHERE node_type='method'" + ).fetchall() + for row in mcounts: + d = json.loads(row["data"]) + sid = d.get("service_id", "") + if sid: + method_counts[sid] = method_counts.get(sid, 0) + 1 + for n in nodes: + if n["type"] == "service": + n["method_count"] = method_counts.get(n["id"], 0) + + # Edges + skip_kinds = {"method_call", "flow_step"} + erows = conn.execute( + "SELECT source_id, target_id, kind, metadata FROM graph_edges" + ).fetchall() + seen = set() + for row in erows: + if row["kind"] in skip_kinds: + continue + key = (row["source_id"], row["target_id"], row["kind"]) + if key in seen: + continue + seen.add(key) + meta = json.loads(row["metadata"] or "{}") + edges.append({ + "source": row["source_id"], + "target": row["target_id"], + "kind": row["kind"], + "meta": meta, + }) + return json.dumps({"nodes": nodes, "edges": edges}, indent=2) + def clear(self) -> None: """Delete all graph data.""" with self._conn() as conn: diff --git a/tests/test_graph_export.py b/tests/test_graph_export.py new file mode 100644 index 0000000..eda4e8e --- /dev/null +++ b/tests/test_graph_export.py @@ -0,0 +1,129 @@ +"""Tests for graph export functionality and CLI command.""" + +import json +from pathlib import Path +import pytest +from typer.testing import CliRunner + +from corbell.core.graph.schema import ( + DataStoreNode, + DependencyEdge, + QueueNode, + ServiceNode, +) +from corbell.core.graph.sqlite_store import SQLiteGraphStore +from corbell.cli.commands.graph import app + + +@pytest.fixture +def store(tmp_db): + return SQLiteGraphStore(tmp_db) + + +@pytest.fixture +def populated_store(store): + # Services + store.upsert_node(ServiceNode(id="svc-a", name="Service A", repo="/r/a", language="python")) + store.upsert_node(ServiceNode(id="svc-b", name="Service B", repo="/r/b", language="typescript")) + + # Datastore & Queue + store.upsert_node(DataStoreNode(id="ds:db", kind="postgres", name="Main DB")) + store.upsert_node(QueueNode(id="q:queue", kind="sqs", name="Main Queue")) + + # Edges + store.upsert_edge(DependencyEdge(source_id="svc-a", target_id="svc-b", kind="http_call")) + store.upsert_edge(DependencyEdge(source_id="svc-a", target_id="ds:db", kind="db_read")) + store.upsert_edge(DependencyEdge(source_id="svc-b", target_id="q:queue", kind="queue_publish")) + + return store + + +def test_to_mermaid(populated_store): + mermaid_str = populated_store.to_mermaid() + + # Check node definitions + assert "svc_a[\"Service A\"]" in mermaid_str + assert "svc_b[\"Service B\"]" in mermaid_str + assert "ds_db[(\"Main DB\")]" in mermaid_str + assert "q_queue>\"Main Queue\"]" in mermaid_str + + # Check edge connections (using safe replaced IDs) + assert "svc_a -- HTTP --> svc_b" in mermaid_str + assert "svc_a -- Reads --> ds_db" in mermaid_str + assert "svc_b -- Publishes --> q_queue" in mermaid_str + + # Check styling classes + assert "classDef service fill:#161b22,stroke:#39d353,stroke-width:2px,color:#c9d1d9;" in mermaid_str + assert "class svc_a service" in mermaid_str + assert "class ds_db datastore" in mermaid_str + assert "class q_queue queue" in mermaid_str + + +def test_to_json(populated_store): + json_str = populated_store.to_json() + data = json.loads(json_str) + + assert "nodes" in data + assert "edges" in data + + nodes = {n["id"]: n for n in data["nodes"]} + assert "svc-a" in nodes + assert nodes["svc-a"]["label"] == "Service A" + assert nodes["svc-a"]["language"] == "python" + + assert "ds:db" in nodes + assert nodes["ds:db"]["kind"] == "postgres" + + assert "q:queue" in nodes + assert nodes["q:queue"]["kind"] == "sqs" + + edges = {(e["source"], e["target"]): e for e in data["edges"]} + assert ("svc-a", "svc-b") in edges + assert edges[("svc-a", "svc-b")]["kind"] == "http_call" + + +def test_cli_export_stdout(populated_store, sample_workspace_yaml, monkeypatch): + runner = CliRunner() + + # Mock workspace config loading to point to our temp db + def mock_load(ws_dir): + class MockConfig: + services = [] + def db_path(self, cfg_dir): + return populated_store.db_path + return MockConfig(), Path(sample_workspace_yaml).parent + + monkeypatch.setattr("corbell.cli.commands.graph._load", mock_load) + + # Test mermaid format to stdout + result = runner.invoke(app, ["export", "--format", "mermaid"]) + assert result.exit_code == 0 + assert "graph LR" in result.stdout + assert "svc_a -- HTTP --> svc_b" in result.stdout + + # Test json format to stdout + result = runner.invoke(app, ["export", "--format", "json"]) + assert result.exit_code == 0 + assert '"nodes": [' in result.stdout + + +def test_cli_export_file(populated_store, sample_workspace_yaml, monkeypatch, tmp_path): + runner = CliRunner() + + def mock_load(ws_dir): + class MockConfig: + services = [] + def db_path(self, cfg_dir): + return populated_store.db_path + return MockConfig(), Path(sample_workspace_yaml).parent + + monkeypatch.setattr("corbell.cli.commands.graph._load", mock_load) + + output_file = tmp_path / "graph.mmd" + result = runner.invoke(app, ["export", "--format", "mermaid", "--output", str(output_file)]) + assert result.exit_code == 0 + assert output_file.exists() + + file_content = output_file.read_text() + assert "graph LR" in file_content + assert "svc_a -- HTTP --> svc_b" in file_content