Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion code-review-graph-vscode/src/backend/sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ export class SqliteReader {
if (row) {
const version = parseInt(row.value, 10);
// Must match LATEST_VERSION in code_review_graph/migrations.py
const SUPPORTED_SCHEMA_VERSION = 6;
const SUPPORTED_SCHEMA_VERSION = 8;
if (!isNaN(version) && version > SUPPORTED_SCHEMA_VERSION) {
return `Database was created with a newer version (schema v${version}). Update the extension.`;
}
Expand Down
20 changes: 12 additions & 8 deletions code_review_graph/changes.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,19 @@ def compute_risk_score(store: GraphStore, node: GraphNode) -> float:
Scoring factors:
- Flow participation: 0.05 per flow membership, capped at 0.25
- Community crossing: 0.05 per caller from a different community, capped at 0.15
- Test coverage: 0.30 if no TESTED_BY edges, 0.05 if tested
- Test coverage: 0.30 (untested) scaling down to 0.05 (5+ TESTED_BY edges)
- Security sensitivity: 0.20 if name matches security keywords
- Caller count: callers / 20, capped at 0.10
"""
score = 0.0

# --- Flow participation (cap 0.25) ---
flow_count = store.count_flow_memberships(node.id)
score += min(flow_count * 0.05, 0.25)
# --- Flow participation (cap 0.25), weighted by criticality ---
flow_criticalities = store.get_flow_criticalities_for_node(node.id)
if flow_criticalities:
score += min(sum(flow_criticalities), 0.25)
else:
flow_count = store.count_flow_memberships(node.id)
score += min(flow_count * 0.05, 0.25)

# --- Community crossing (cap 0.15) ---
callers = store.get_edges_by_target(node.qualified_name)
Expand All @@ -179,10 +183,10 @@ def compute_risk_score(store: GraphStore, node: GraphNode) -> float:
cross_community += 1
score += min(cross_community * 0.05, 0.15)

# --- Test coverage ---
tested_edges = store.get_edges_by_target(node.qualified_name)
has_test = any(e.kind == "TESTED_BY" for e in tested_edges)
score += 0.05 if has_test else 0.30
# --- Test coverage (direct + transitive) ---
transitive_tests = store.get_transitive_tests(node.qualified_name)
test_count = len(transitive_tests)
score += 0.30 - (min(test_count / 5.0, 1.0) * 0.25)

# --- Security sensitivity ---
name_lower = node.name.lower()
Expand Down
138 changes: 103 additions & 35 deletions code_review_graph/communities.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,19 @@ def _compute_cohesion_batch(
return results


def _build_adjacency(edges: list[GraphEdge]) -> dict[str, list[str]]:
"""Build adjacency list from edges (one pass over all edges)."""
adj: dict[str, list[str]] = defaultdict(list)
for e in edges:
adj[e.source_qualified].append(e.target_qualified)
adj[e.target_qualified].append(e.source_qualified)
return adj


def _compute_cohesion(
member_qns: set[str], all_edges: list[GraphEdge]
member_qns: set[str],
all_edges: list[GraphEdge],
adj: dict[str, list[str]] | None = None,
) -> float:
"""Compute cohesion: internal_edges / (internal_edges + external_edges).

Expand All @@ -213,7 +224,10 @@ def _compute_cohesion(


def _detect_leiden(
nodes: list[GraphNode], edges: list[GraphEdge], min_size: int
nodes: list[GraphNode],
edges: list[GraphEdge],
min_size: int,
adj: dict[str, list[str]] | None = None,
) -> list[dict[str, Any]]:
"""Detect communities using Leiden algorithm via igraph.

Expand Down Expand Up @@ -251,11 +265,18 @@ def _detect_leiden(
weights.append(EDGE_WEIGHTS.get(e.kind, 0.5))

if not edge_list:
return _detect_file_based(nodes, edges, min_size)
return _detect_file_based(nodes, edges, min_size, adj=adj)

g.add_edges(edge_list)
g.es["weight"] = weights

# Run Leiden -- scale resolution inversely with graph size to get
# coarser clusters on large repos. Default resolution=1.0 produces
# thousands of tiny communities for 30k+ node graphs.
import math
n_nodes = g.vcount()
resolution = max(0.05, 1.0 / math.log10(max(n_nodes, 10)))

logger.info(
"Running Leiden on %d nodes, %d edges...",
g.vcount(), g.ecount(),
Expand All @@ -264,6 +285,7 @@ def _detect_leiden(
partition = g.community_leiden(
objective_function="modularity",
weights="weight",
resolution=resolution,
n_iterations=2,
)

Expand Down Expand Up @@ -311,28 +333,73 @@ def _detect_leiden(


def _detect_file_based(
nodes: list[GraphNode], edges: list[GraphEdge], min_size: int
nodes: list[GraphNode],
edges: list[GraphEdge],
min_size: int,
adj: dict[str, list[str]] | None = None,
) -> list[dict[str, Any]]:
"""Group nodes by file_path when igraph is not available."""
by_file: dict[str, list[GraphNode]] = defaultdict(list)
"""Group nodes by directory when Leiden is unavailable or over-fragments.

Strips the longest common directory prefix from all file paths, then
adaptively picks a grouping depth that yields 10-200 communities.
"""
# Collect all directory paths (normalized, without filename)
all_dir_parts: list[list[str]] = []
for n in nodes:
by_file[n.file_path].append(n)
parts = n.file_path.replace("\\", "/").split("/")
all_dir_parts.append([p for p in parts[:-1] if p])

# Find the longest common prefix among directory parts
prefix_len = 0
if all_dir_parts:
shortest = min(len(p) for p in all_dir_parts)
for i in range(shortest):
seg = all_dir_parts[0][i]
if all(p[i] == seg for p in all_dir_parts):
prefix_len = i + 1
else:
break

def _group_at_depth(depth: int) -> dict[str, list[GraphNode]]:
groups: dict[str, list[GraphNode]] = defaultdict(list)
for n in nodes:
parts = n.file_path.replace("\\", "/").split("/")
dir_parts = [p for p in parts[:-1] if p]
remainder = dir_parts[prefix_len:]
if remainder:
key = "/".join(remainder[:depth])
else:
key = parts[-1].rsplit(".", 1)[0] if parts else "root"
groups[key].append(n)
return groups

# Try increasing depths until we get 10-200 qualifying groups
max_depth = max((len(p) - prefix_len for p in all_dir_parts), default=0)
best_groups = _group_at_depth(1) # depth=1 always works (file stem fallback)
for depth in range(1, max_depth + 1):
groups = _group_at_depth(depth)
qualifying = sum(1 for v in groups.values() if len(v) >= min_size)
best_groups = groups
if qualifying >= 10:
break

by_dir = best_groups

# Pre-filter to communities meeting min_size and collect their member
# sets so we can batch-compute all cohesions in a single O(edges) pass.
# Without this, per-community cohesion is O(edges * files), which makes
# community detection effectively hang on large repos.
pending: list[tuple[str, list[GraphNode], set[str]]] = []
for file_path, members in by_file.items():
for dir_path, members in by_dir.items():
if len(members) < min_size:
continue
member_qns = {m.qualified_name for m in members}
pending.append((file_path, members, member_qns))
pending.append((dir_path, members, member_qns))

cohesions = _compute_cohesion_batch([p[2] for p in pending], edges)

communities: list[dict[str, Any]] = []
for (file_path, members, member_qns), cohesion in zip(pending, cohesions):
for (dir_path, members, member_qns), cohesion in zip(pending, cohesions):
lang_counts = Counter(m.language for m in members if m.language)
dominant_lang = lang_counts.most_common(1)[0][0] if lang_counts else ""
name = _generate_community_name(members)
Expand All @@ -343,7 +410,7 @@ def _detect_file_based(
"size": len(members),
"cohesion": round(cohesion, 4),
"dominant_language": dominant_lang,
"description": f"File-based community: {file_path}",
"description": f"Directory-based community: {dir_path}",
"members": [m.qualified_name for m in members],
"member_qns": member_qns,
})
Expand Down Expand Up @@ -374,28 +441,10 @@ def detect_communities(
"""
# Gather all nodes (exclude File nodes to focus on code entities)
all_edges = store.get_all_edges()
all_files = store.get_all_files()
unique_nodes = store.get_all_nodes(exclude_files=True)

logger.info("Loading nodes from %d files...", len(all_files))

nodes: list[GraphNode] = []
for fp in all_files:
nodes.extend(store.get_nodes_by_file(fp))

# Also gather nodes from files referenced in edges but not in all_files
edge_files: set[str] = set()
for e in all_edges:
edge_files.add(e.file_path)
for fp in edge_files - set(all_files):
nodes.extend(store.get_nodes_by_file(fp))

# Deduplicate by qualified_name
seen_qns: set[str] = set()
unique_nodes: list[GraphNode] = []
for n in nodes:
if n.qualified_name not in seen_qns:
seen_qns.add(n.qualified_name)
unique_nodes.append(n)
# Build adjacency index once for fast cohesion computation
adj = _build_adjacency(all_edges)

logger.info(
"Loaded %d unique nodes, %d edges",
Expand All @@ -404,10 +453,10 @@ def detect_communities(

if IGRAPH_AVAILABLE:
logger.info("Detecting communities with Leiden algorithm (igraph)")
results = _detect_leiden(unique_nodes, all_edges, min_size)
results = _detect_leiden(unique_nodes, all_edges, min_size, adj=adj)
else:
logger.info("igraph not available, using file-based community detection")
results = _detect_file_based(unique_nodes, all_edges, min_size)
results = _detect_file_based(unique_nodes, all_edges, min_size, adj=adj)

# Convert member_qns (internal set) to a list for serialization safety,
# then strip it from the returned dicts to avoid leaking internal state.
Expand Down Expand Up @@ -569,6 +618,17 @@ def get_communities(
return communities


_TEST_COMMUNITY_RE = re.compile(
r"(^test[-/]|[-/]test([:/]|$)|it:should|describe:|spec[-/]|[-/]spec$)",
re.IGNORECASE,
)


def _is_test_community(name: str) -> bool:
"""Return True if a community name indicates it is test-dominated."""
return bool(_TEST_COMMUNITY_RE.search(name))


def get_architecture_overview(store: GraphStore) -> dict[str, Any]:
"""Generate an architecture overview based on community structure.

Expand Down Expand Up @@ -596,6 +656,10 @@ def get_architecture_overview(store: GraphStore) -> dict[str, Any]:
cross_counts: Counter[tuple[int, int]] = Counter()

for e in all_edges:
# TESTED_BY edges are expected cross-community coupling (test → code),
# not an architectural smell.
if e.kind == "TESTED_BY":
continue
src_comm = node_to_community.get(e.source_qualified)
tgt_comm = node_to_community.get(e.target_qualified)
if (
Expand All @@ -613,13 +677,17 @@ def get_architecture_overview(store: GraphStore) -> dict[str, Any]:
"target": _sanitize_name(e.target_qualified),
})

# Generate warnings for high coupling
# Generate warnings for high coupling, skipping test-dominated pairs.
warnings: list[str] = []
comm_name_map = {c.get("id", 0): c["name"] for c in communities}
for (c1, c2), count in cross_counts.most_common():
if count > 10:
name1 = comm_name_map.get(c1, f"community-{c1}")
name2 = comm_name_map.get(c2, f"community-{c2}")
# Skip pairs where either community is test-dominated — coupling
# between test and production code is expected, not architectural.
if _is_test_community(name1) or _is_test_community(name2):
continue
warnings.append(
f"High coupling ({count} edges) between "
f"'{name1}' and '{name2}'"
Expand Down
Loading
Loading