Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions corbell/cli/commands/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,29 @@ def graph_callpath(
console.print(f"[green]Found {len(paths)} path(s):[/green]")
for i, path in enumerate(paths, 1):
console.print(f"\n Path {i}: {' → '.join(path)}")


@app.command("export")
def graph_export(
workspace: Optional[Path] = typer.Option(None, "--workspace", "-w", help="Workspace directory."),
output: Optional[Path] = typer.Option(None, "--output", "-o", help="Output file path (default: stdout)."),
format: str = typer.Option("mermaid", "--format", "-f", help="Output format: mermaid | json"),
):
"""Export the service graph to Mermaid or JSON format."""
cfg, config_dir = _load(workspace)
store = _get_store(cfg, config_dir)

if format == "mermaid":
content = store.to_mermaid()
elif format == "json":
content = store.to_json()
else:
console.print(f"[red]Unsupported format: {format}. Must be 'mermaid' or 'json'.[/red]")
raise typer.Exit(1)

if output:
output.write_text(content, encoding="utf-8")
console.print(f"[green]✓ Exported service graph to {output}[/green]")
else:
print(content)

10 changes: 10 additions & 0 deletions corbell/core/graph/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,16 @@ def get_all_nodes_summary(self) -> Dict[str, Any]:
"""Return a summary of all nodes and edges for display."""
...

@abstractmethod
def to_mermaid(self) -> str:
"""Return a Mermaid representation of the graph."""
...

@abstractmethod
def to_json(self) -> str:
"""Return the JSON representation of the service graph."""
...

@abstractmethod
def clear(self) -> None:
"""Remove all data from the store."""
Expand Down
154 changes: 154 additions & 0 deletions corbell/core/graph/sqlite_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,160 @@ def get_all_nodes_summary(self) -> Dict[str, Any]:
"edges": edge_count,
}

def to_mermaid(self) -> str:
"""Return a single Mermaid file string describing the system boundaries."""
with self._conn() as conn:
rows = conn.execute("SELECT id, node_type, data FROM graph_nodes").fetchall()
services, datastores, queues = [], [], []

for row in rows:
ntype = row["node_type"]
data = json.loads(row["data"])
nid = row["id"].replace("-", "_").replace(".", "_").replace(":", "_")
label = data.get("name", row["id"]).replace('"', "'")
if ntype == "service":
services.append({"id": nid, "label": label})
elif ntype == "datastore":
datastores.append({"id": nid, "label": label})
elif ntype == "queue":
queues.append({"id": nid, "label": label})

edges = conn.execute("SELECT source_id, target_id, kind FROM graph_edges").fetchall()
connections = []
seen = set()
for e in edges:
kind = e["kind"]
if kind in ("method_call", "flow_step", "git_coupling", "flow_link"):
continue
src = e["source_id"].replace("-", "_").replace(".", "_").replace(":", "_")
tgt = e["target_id"].replace("-", "_").replace(".", "_").replace(":", "_")
if (src, tgt, kind) in seen:
continue
seen.add((src, tgt, kind))

if kind == "http_call":
connections.append(f" {src} -- HTTP --> {tgt}")
elif kind == "rpc_call":
connections.append(f" {src} -- RPC/Edge Function --> {tgt}")
elif kind == "db_read":
connections.append(f" {src} -- Reads --> {tgt}")
elif kind == "db_write":
connections.append(f" {src} -- Writes --> {tgt}")
elif kind == "queue_publish":
connections.append(f" {src} -- Publishes --> {tgt}")
elif kind == "queue_consume":
connections.append(f" {src} -- Consumes --> {tgt}")
elif kind == "library_dependency":
connections.append(f" {src} -. Import/Library .-> {tgt}")
else:
connections.append(f" {src} --> {tgt}")

lines = ["graph LR"]
lines.append(" %% Services")
for s in services:
lines.append(f' {s["id"]}["{s["label"]}"]')

if datastores:
lines.append(" %% Data Stores")
for d in datastores:
lines.append(f' {d["id"]}[("{d["label"]}")]')

if queues:
lines.append(" %% Queues")
for q in queues:
lines.append(f' {q["id"]}>"{q["label"]}"]')

lines.append(" %% Edges")
lines.extend(connections)

lines.append(" %% Styling")
lines.append(" classDef service fill:#161b22,stroke:#39d353,stroke-width:2px,color:#c9d1d9;")
lines.append(" classDef datastore fill:#161b22,stroke:#ffa657,stroke-width:2px,color:#c9d1d9;")
lines.append(" classDef queue fill:#161b22,stroke:#bc8cff,stroke-width:2px,color:#c9d1d9;")

for s in services:
lines.append(f' class {s["id"]} service')
for d in datastores:
lines.append(f' class {d["id"]} datastore')
for q in queues:
lines.append(f' class {q["id"]} queue')

return "\n".join(lines)

def to_json(self) -> str:
"""Return the JSON representation of the service graph."""
nodes = []
edges = []
with self._conn() as conn:
rows = conn.execute("SELECT id, node_type, data FROM graph_nodes").fetchall()
for row in rows:
ntype = row["node_type"]
data = json.loads(row["data"])
node = {"id": row["id"], "type": ntype}
if ntype == "service":
node.update({
"label": data.get("name", row["id"]),
"language": data.get("language", ""),
"service_type": data.get("service_type", "api"),
"tags": data.get("tags", []),
})
elif ntype == "datastore":
node.update({"label": data.get("name", row["id"]), "kind": data.get("kind", "")})
elif ntype == "queue":
node.update({"label": data.get("name", row["id"]), "kind": data.get("kind", "")})
elif ntype == "flow":
svc_id = data.get("service_id", "")
node.update({
"label": data.get("name", row["id"]),
"service_id": svc_id,
"step_count": data.get("step_count", 0),
})
edges.append({
"source": svc_id,
"target": row["id"],
"kind": "flow_link",
"meta": {}
})
elif ntype == "method":
continue # don't clutter service-level graph
nodes.append(node)

# Count methods per service for node sizing
method_counts: Dict[str, int] = {}
mcounts = conn.execute(
"SELECT data FROM graph_nodes WHERE node_type='method'"
).fetchall()
for row in mcounts:
d = json.loads(row["data"])
sid = d.get("service_id", "")
if sid:
method_counts[sid] = method_counts.get(sid, 0) + 1
for n in nodes:
if n["type"] == "service":
n["method_count"] = method_counts.get(n["id"], 0)

# Edges
skip_kinds = {"method_call", "flow_step"}
erows = conn.execute(
"SELECT source_id, target_id, kind, metadata FROM graph_edges"
).fetchall()
seen = set()
for row in erows:
if row["kind"] in skip_kinds:
continue
key = (row["source_id"], row["target_id"], row["kind"])
if key in seen:
continue
seen.add(key)
meta = json.loads(row["metadata"] or "{}")
edges.append({
"source": row["source_id"],
"target": row["target_id"],
"kind": row["kind"],
"meta": meta,
})
return json.dumps({"nodes": nodes, "edges": edges}, indent=2)

def clear(self) -> None:
"""Delete all graph data."""
with self._conn() as conn:
Expand Down
129 changes: 129 additions & 0 deletions tests/test_graph_export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""Tests for graph export functionality and CLI command."""

import json
from pathlib import Path
import pytest
from typer.testing import CliRunner

from corbell.core.graph.schema import (
DataStoreNode,
DependencyEdge,
QueueNode,
ServiceNode,
)
from corbell.core.graph.sqlite_store import SQLiteGraphStore
from corbell.cli.commands.graph import app


@pytest.fixture
def store(tmp_db):
return SQLiteGraphStore(tmp_db)


@pytest.fixture
def populated_store(store):
# Services
store.upsert_node(ServiceNode(id="svc-a", name="Service A", repo="/r/a", language="python"))
store.upsert_node(ServiceNode(id="svc-b", name="Service B", repo="/r/b", language="typescript"))

# Datastore & Queue
store.upsert_node(DataStoreNode(id="ds:db", kind="postgres", name="Main DB"))
store.upsert_node(QueueNode(id="q:queue", kind="sqs", name="Main Queue"))

# Edges
store.upsert_edge(DependencyEdge(source_id="svc-a", target_id="svc-b", kind="http_call"))
store.upsert_edge(DependencyEdge(source_id="svc-a", target_id="ds:db", kind="db_read"))
store.upsert_edge(DependencyEdge(source_id="svc-b", target_id="q:queue", kind="queue_publish"))

return store


def test_to_mermaid(populated_store):
mermaid_str = populated_store.to_mermaid()

# Check node definitions
assert "svc_a[\"Service A\"]" in mermaid_str
assert "svc_b[\"Service B\"]" in mermaid_str
assert "ds_db[(\"Main DB\")]" in mermaid_str
assert "q_queue>\"Main Queue\"]" in mermaid_str

# Check edge connections (using safe replaced IDs)
assert "svc_a -- HTTP --> svc_b" in mermaid_str
assert "svc_a -- Reads --> ds_db" in mermaid_str
assert "svc_b -- Publishes --> q_queue" in mermaid_str

# Check styling classes
assert "classDef service fill:#161b22,stroke:#39d353,stroke-width:2px,color:#c9d1d9;" in mermaid_str
assert "class svc_a service" in mermaid_str
assert "class ds_db datastore" in mermaid_str
assert "class q_queue queue" in mermaid_str


def test_to_json(populated_store):
json_str = populated_store.to_json()
data = json.loads(json_str)

assert "nodes" in data
assert "edges" in data

nodes = {n["id"]: n for n in data["nodes"]}
assert "svc-a" in nodes
assert nodes["svc-a"]["label"] == "Service A"
assert nodes["svc-a"]["language"] == "python"

assert "ds:db" in nodes
assert nodes["ds:db"]["kind"] == "postgres"

assert "q:queue" in nodes
assert nodes["q:queue"]["kind"] == "sqs"

edges = {(e["source"], e["target"]): e for e in data["edges"]}
assert ("svc-a", "svc-b") in edges
assert edges[("svc-a", "svc-b")]["kind"] == "http_call"


def test_cli_export_stdout(populated_store, sample_workspace_yaml, monkeypatch):
runner = CliRunner()

# Mock workspace config loading to point to our temp db
def mock_load(ws_dir):
class MockConfig:
services = []
def db_path(self, cfg_dir):
return populated_store.db_path
return MockConfig(), Path(sample_workspace_yaml).parent

monkeypatch.setattr("corbell.cli.commands.graph._load", mock_load)

# Test mermaid format to stdout
result = runner.invoke(app, ["export", "--format", "mermaid"])
assert result.exit_code == 0
assert "graph LR" in result.stdout
assert "svc_a -- HTTP --> svc_b" in result.stdout

# Test json format to stdout
result = runner.invoke(app, ["export", "--format", "json"])
assert result.exit_code == 0
assert '"nodes": [' in result.stdout


def test_cli_export_file(populated_store, sample_workspace_yaml, monkeypatch, tmp_path):
runner = CliRunner()

def mock_load(ws_dir):
class MockConfig:
services = []
def db_path(self, cfg_dir):
return populated_store.db_path
return MockConfig(), Path(sample_workspace_yaml).parent

monkeypatch.setattr("corbell.cli.commands.graph._load", mock_load)

output_file = tmp_path / "graph.mmd"
result = runner.invoke(app, ["export", "--format", "mermaid", "--output", str(output_file)])
assert result.exit_code == 0
assert output_file.exists()

file_content = output_file.read_text()
assert "graph LR" in file_content
assert "svc_a -- HTTP --> svc_b" in file_content
Loading