From a8f589ef91715e58ff901615e89574ff34907f0b Mon Sep 17 00:00:00 2001 From: RaghavChamadiya Date: Tue, 7 Apr 2026 15:33:17 +0530 Subject: [PATCH 1/4] =?UTF-8?q?feat:=20pipeline=20overhaul=20=E2=80=94=20R?= =?UTF-8?q?AG=20generation,=20parallel=20ingest,=20atomic=20stores,=20cost?= =?UTF-8?q?=20tracking,=20PR=20blast=20radius?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds 11 capabilities across the indexing pipeline, persistence layer, MCP tools, and CLI. MCP tool count is unchanged; new functionality is folded into existing tools (get_risk, get_overview, get_dead_code). Pipeline & generation - ProcessPool-based parsing with sequential fallback; ingestion and git stages now run concurrently via asyncio.gather - RAG-aware doc generation: dependency summaries are pre-fetched from the vector store and injected into the file_page prompt; pages generated in topological order so leaves are summarized before their dependents - Dynamic import hint extractors (Django INSTALLED_APPS/ROOT_URLCONF/ MIDDLEWARE/url include, pytest conftest fixtures, Node package.json exports + tsconfig path aliases) wired into GraphBuilder.add_dynamic_edges Persistence - AtomicStorageCoordinator with async transaction() context manager and health_check() spanning SQL, in-memory graph, and vector store - recompute_git_percentiles now uses a single SQL PERCENT_RANK() window function instead of in-memory Python ranking - New temporal_hotspot_score column on git_metadata, computed via exp decay (180-day half-life) and used as the primary percentile sort key - New llm_costs and security_findings tables; matching ORM models - vector_store.get_page_summary_by_path() on all three backends Cost tracking - CostTracker with per-call recording, persisted to llm_costs; pricing table covers Claude 4.6 family, GPT-4o, and Gemini 1.5/2.5/3.x variants - Wired into Anthropic, Gemini, OpenAI, and LiteLLM providers - Live USD column on the indexing progress bar - New `repowise costs` CLI grouping by operation/model/day Analysis - PRBlastRadiusAnalyzer: transitive ancestor BFS over graph_edges, co-change warnings, recommended reviewers by temporal ownership, test gaps, 0–10 overall risk score - SecurityScanner: pattern-based scan for eval/exec/pickle/raw SQL/ hardcoded secrets/weak hashes; persisted at index time MCP tool extensions - get_risk(changed_files=[...]) returns blast radius; per-file payload now includes test_gap and security_signals - get_overview returns knowledge_map with top owners, knowledge silos (>80% ownership), and onboarding targets - get_dead_code accepts min_confidence, include_internals, include_zombie_packages, no_unreachable, no_unused_exports CLI - `repowise dead-code` exposes the same sensitivity flags - `repowise doctor` adds a coordinator drift health check (Check #10) - `repowise costs` command registered Tests - test_models.py: expected table set updated to include llm_costs and security_findings; full suite green (757 passed, 9 skipped) - End-to-end validated against test-repos/microdot: 164 files ingested, 83 pages generated, 132 git_metadata rows with temporal hotspot score, 83 cost rows totaling $0.0258, 2 security findings, drift = 0 --- README.md | 64 +++- .../src/repowise/cli/commands/costs_cmd.py | 157 ++++++++++ .../repowise/cli/commands/dead_code_cmd.py | 38 ++- .../src/repowise/cli/commands/doctor_cmd.py | 71 +++++ .../cli/src/repowise/cli/commands/init_cmd.py | 43 +++ packages/cli/src/repowise/cli/main.py | 2 + packages/cli/src/repowise/cli/ui.py | 12 +- .../core/alembic/versions/0009_llm_costs.py | 54 ++++ .../versions/0010_temporal_hotspot_score.py | 38 +++ .../versions/0011_security_findings.py | 51 ++++ .../src/repowise/core/analysis/pr_blast.py | 282 ++++++++++++++++++ .../repowise/core/analysis/security_scan.py | 128 ++++++++ .../repowise/core/generation/cost_tracker.py | 264 ++++++++++++++++ .../core/generation/page_generator.py | 62 ++++ .../core/generation/templates/file_page.j2 | 9 + .../core/ingestion/dynamic_hints/__init__.py | 6 + .../core/ingestion/dynamic_hints/base.py | 21 ++ .../core/ingestion/dynamic_hints/django.py | 152 ++++++++++ .../core/ingestion/dynamic_hints/node.py | 136 +++++++++ .../ingestion/dynamic_hints/pytest_hints.py | 103 +++++++ .../core/ingestion/dynamic_hints/registry.py | 32 ++ .../repowise/core/ingestion/git_indexer.py | 30 +- .../core/src/repowise/core/ingestion/graph.py | 20 ++ .../repowise/core/persistence/coordinator.py | 212 +++++++++++++ .../src/repowise/core/persistence/crud.py | 42 ++- .../src/repowise/core/persistence/models.py | 42 +++ .../repowise/core/persistence/vector_store.py | 98 ++++++ .../repowise/core/pipeline/orchestrator.py | 169 +++++++++-- .../src/repowise/core/pipeline/persist.py | 19 ++ .../repowise/core/providers/llm/anthropic.py | 24 +- .../src/repowise/core/providers/llm/gemini.py | 23 +- .../repowise/core/providers/llm/litellm.py | 24 +- .../src/repowise/core/providers/llm/openai.py | 24 +- .../server/mcp_server/tool_dead_code.py | 42 ++- .../server/mcp_server/tool_overview.py | 75 ++++- .../repowise/server/mcp_server/tool_risk.py | 87 +++++- tests/unit/persistence/test_models.py | 2 + 37 files changed, 2593 insertions(+), 65 deletions(-) create mode 100644 packages/cli/src/repowise/cli/commands/costs_cmd.py create mode 100644 packages/core/alembic/versions/0009_llm_costs.py create mode 100644 packages/core/alembic/versions/0010_temporal_hotspot_score.py create mode 100644 packages/core/alembic/versions/0011_security_findings.py create mode 100644 packages/core/src/repowise/core/analysis/pr_blast.py create mode 100644 packages/core/src/repowise/core/analysis/security_scan.py create mode 100644 packages/core/src/repowise/core/generation/cost_tracker.py create mode 100644 packages/core/src/repowise/core/ingestion/dynamic_hints/__init__.py create mode 100644 packages/core/src/repowise/core/ingestion/dynamic_hints/base.py create mode 100644 packages/core/src/repowise/core/ingestion/dynamic_hints/django.py create mode 100644 packages/core/src/repowise/core/ingestion/dynamic_hints/node.py create mode 100644 packages/core/src/repowise/core/ingestion/dynamic_hints/pytest_hints.py create mode 100644 packages/core/src/repowise/core/ingestion/dynamic_hints/registry.py create mode 100644 packages/core/src/repowise/core/persistence/coordinator.py diff --git a/README.md b/README.md index b880388..a887b3b 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,48 @@ The result: Claude Code answers *"why does auth work this way?"* instead of *"he --- +## What's new + +### Faster indexing +Indexing is now fully parallel. A `ProcessPoolExecutor` distributes AST parsing across all CPU cores. Graph construction and git history indexing run concurrently via `asyncio.gather`. Per-file git history is fetched through a thread executor with a semaphore to cap concurrency — full parallelism without overwhelming the system. Large repos index noticeably faster. + +### RAG-aware documentation generation +Every wiki page is generated with richer context: before calling the LLM, repowise fetches the already-generated summaries of each file's direct dependencies from the vector store and injects them into the prompt. Generation is topologically sorted so leaf files are always written first. The LLM sees what its dependencies actually do, not just their names — producing more accurate, cross-referenced documentation. + +### Atomic three-store transactions +`AtomicStorageCoordinator` buffers writes across the SQL database, the in-memory dependency graph, and the vector store, then flushes them in a single coordinated operation. If any store fails, all three are rolled back — no partial writes, no silent drift. Run `repowise doctor` to inspect drift across all three stores and repair mismatches. + +### Dynamic import hints +The dependency graph now captures edges that pure AST parsing misses: +- Django `INSTALLED_APPS`, `ROOT_URLCONF`, and `MIDDLEWARE` settings +- pytest fixture wiring through `conftest.py` +- Node/TypeScript path aliases from `tsconfig.json` `paths` and `package.json` `exports` + +These edges appear in `get_context`, `get_risk`, and `get_dependency_path` like any other dependency. + +### Temporal hotspot decay +Hotspot scoring now uses an exponentially time-decayed score with a 180-day half-life layered on top of the raw 90-day churn count. A commit from a year ago contributes roughly 25% as much as a commit from today. The score reflects recent activity, not just total volume. Surfaced in `get_overview` and `get_risk`. + +### Percentile ranks via SQL window function +Incremental updates now recompute global percentile ranks for every file using a single `PERCENT_RANK()` SQL window function. Previously this required loading all rows into Python. The new approach is both faster and correct on large repos — no sampling, no approximation. + +### PR blast radius +`get_risk(changed_files=[...])` now returns a full blast-radius report: transitive affected files, co-change warnings for historical co-change partners not included in the PR, recommended reviewers ranked by temporal ownership, test gap detection, and an overall 0–10 risk score. Same eight tools — substantially more signal per call. + +### Knowledge map in `get_overview` +`get_overview` now surfaces: top owners across the codebase, "bus factor 1" knowledge silos (files where one person owns >80% of commits), and onboarding targets — high-centrality files with the weakest documentation coverage. Useful for team planning and risk review. + +### Test gaps and security signals in `get_risk` +`get_risk` now includes a `test_gap` flag per file (no test file co-changes detected) and `security_signals` — static pattern detection for common risk categories: authentication bypass patterns, `eval`-family calls, raw SQL string construction, and weak cryptography. Signals appear alongside the existing hotspot and ownership data. + +### LLM cost tracking +Every LLM call is logged to a new `llm_costs` table with operation type, model, token counts, and estimated cost. A new `repowise costs` CLI command lets you group spending by operation, model, or day. The indexing progress bar now shows a live `Cost: $X.XXX` counter next to the spinner. + +### Configurable dead-code sensitivity +The `repowise dead-code` command and the `get_dead_code` MCP tool now expose sensitivity controls: `--min-confidence` (default 0.70), `--include-internals` (include private/underscore-prefixed symbols), and `--include-zombie-packages` (packages present in `package.json` / `pyproject.toml` but unused in the graph). Tune the output to your cleanup goals. + +--- + ## What repowise builds repowise runs once, builds everything, then keeps it in sync on every commit. @@ -94,11 +136,11 @@ Most tools are designed around data entities — one module, one file, one symbo |---|---|---| | `get_overview()` | Architecture summary, module map, entry points | First call on any unfamiliar codebase | | `get_context(targets, include?)` | Docs, ownership, decisions, freshness for any targets — files, modules, or symbols | Before reading or modifying code. Pass all relevant targets in one call. | -| `get_risk(targets)` | Hotspot scores, dependents, co-change partners, plain-English risk summary | Before modifying files — understand what could break | +| `get_risk(targets?, changed_files?)` | Hotspot scores, dependents, co-change partners, blast radius, recommended reviewers, test gaps, security signals, 0–10 risk score | Before modifying files — understand what could break | | `get_why(query?)` | Three modes: NL search over decisions · path-based decisions for a file · no-arg health dashboard | Before architectural changes — understand existing intent | | `search_codebase(query)` | Semantic search over the full wiki. Natural language. | When you don't know where something lives | | `get_dependency_path(from, to)` | Connection path between two files, modules, or symbols | When tracing how two things are connected | -| `get_dead_code()` | Unreachable code sorted by confidence and cleanup impact | Cleanup tasks | +| `get_dead_code(min_confidence?, include_internals?, include_zombie_packages?)` | Unreachable code sorted by confidence and cleanup impact | Cleanup tasks | | `get_architecture_diagram(module?)` | Mermaid diagram for the repo or a specific module | Documentation and presentation | ### Tool call comparison — a real task @@ -333,9 +375,18 @@ repowise search "" # semantic search over the wiki repowise status # coverage, freshness, dead code summary # Dead code -repowise dead-code # full report -repowise dead-code --safe-only # only safe-to-delete findings -repowise dead-code resolve # mark resolved / false positive +repowise dead-code # full report +repowise dead-code --safe-only # only safe-to-delete findings +repowise dead-code --min-confidence 0.8 # raise the confidence threshold +repowise dead-code --include-internals # include private/underscore symbols +repowise dead-code --include-zombie-packages # include unused declared packages +repowise dead-code resolve # mark resolved / false positive + +# Cost tracking +repowise costs # total LLM spend to date +repowise costs --by operation # grouped by operation type +repowise costs --by model # grouped by model +repowise costs --by day # grouped by day # Decisions repowise decision add # record a decision (interactive) @@ -348,7 +399,8 @@ repowise generate-claude-md # regenerate CLAUDE.md # Utilities repowise export [PATH] # export wiki as markdown files -repowise doctor # check setup, API keys, connectivity +repowise doctor # check setup, API keys, store drift +repowise doctor --repair # check and fix detected store mismatches repowise reindex # rebuild vector store (no LLM calls) ``` diff --git a/packages/cli/src/repowise/cli/commands/costs_cmd.py b/packages/cli/src/repowise/cli/commands/costs_cmd.py new file mode 100644 index 0000000..000eb84 --- /dev/null +++ b/packages/cli/src/repowise/cli/commands/costs_cmd.py @@ -0,0 +1,157 @@ +"""``repowise costs`` — display LLM cost history from the cost ledger.""" + +from __future__ import annotations + +from datetime import datetime +from pathlib import Path +from typing import Any + +import click +from rich.table import Table + +from repowise.cli.helpers import ( + console, + get_db_url_for_repo, + resolve_repo_path, + run_async, +) + + +def _parse_date(value: str | None) -> datetime | None: + """Parse an ISO date string into a datetime, or return None.""" + if value is None: + return None + try: + return datetime.fromisoformat(value) + except ValueError: + try: + from dateutil.parser import parse as _parse # type: ignore[import-untyped] + + return _parse(value) + except Exception as exc: + raise click.BadParameter(f"Cannot parse date '{value}': {exc}") from exc + + +@click.command("costs") +@click.argument("path", required=False, default=None) +@click.option( + "--since", + default=None, + metavar="DATE", + help="Only show costs since this date (ISO format, e.g. 2026-01-01).", +) +@click.option( + "--by", + "group_by", + type=click.Choice(["operation", "model", "day"]), + default="operation", + show_default=True, + help="Group costs by operation, model, or day.", +) +@click.option( + "--repo-path", + "repo_path_flag", + default=None, + metavar="PATH", + help="Repository path (defaults to current directory).", +) +def costs_command( + path: str | None, + since: str | None, + group_by: str, + repo_path_flag: str | None, +) -> None: + """Show LLM cost history for a repository. + + PATH (or --repo-path) defaults to the current directory. + """ + # Support both positional PATH and --repo-path flag + raw_path = path or repo_path_flag + repo_path = resolve_repo_path(raw_path) + + repowise_dir = repo_path / ".repowise" + if not repowise_dir.exists(): + console.print("[yellow]No .repowise/ directory found. Run 'repowise init' first.[/yellow]") + return + + since_dt = _parse_date(since) + + rows = run_async(_query_costs(repo_path, since=since_dt, group_by=group_by)) + + if not rows: + msg = "No cost records found" + if since_dt: + msg += f" since {since_dt.date()}" + msg += ". Run 'repowise init' with an LLM provider to generate costs." + console.print(f"[yellow]{msg}[/yellow]") + return + + # Build table + group_label = group_by.capitalize() + table = Table( + title=f"LLM Costs — grouped by {group_by}", + border_style="dim", + show_footer=True, + ) + table.add_column(group_label, style="cyan", footer="[bold]TOTAL[/bold]") + table.add_column("Calls", justify="right", footer=str(sum(r["calls"] for r in rows))) + table.add_column( + "Input Tokens", + justify="right", + footer=f"{sum(r['input_tokens'] for r in rows):,}", + ) + table.add_column( + "Output Tokens", + justify="right", + footer=f"{sum(r['output_tokens'] for r in rows):,}", + ) + table.add_column( + "Cost USD", + justify="right", + footer=f"[bold green]${sum(r['cost_usd'] for r in rows):.4f}[/bold green]", + ) + + for row in rows: + table.add_row( + str(row["group"] or "—"), + str(row["calls"]), + f"{row['input_tokens']:,}", + f"{row['output_tokens']:,}", + f"[green]${row['cost_usd']:.4f}[/green]", + ) + + console.print() + console.print(table) + console.print() + + +async def _query_costs( + repo_path: Path, + since: datetime | None, + group_by: str, +) -> list[dict[str, Any]]: + """Open the DB, look up the repo, and return aggregated cost rows.""" + from repowise.core.generation.cost_tracker import CostTracker + from repowise.core.persistence import ( + create_engine, + create_session_factory, + get_session, + init_db, + ) + from repowise.core.persistence.crud import get_repository_by_path + + url = get_db_url_for_repo(repo_path) + engine = create_engine(url) + await init_db(engine) + sf = create_session_factory(engine) + + try: + async with get_session(sf) as session: + repo = await get_repository_by_path(session, str(repo_path)) + if repo is None: + return [] + + tracker = CostTracker(session_factory=sf, repo_id=repo.id) + return await tracker.totals(since=since, group_by=group_by) + finally: + await engine.dispose() diff --git a/packages/cli/src/repowise/cli/commands/dead_code_cmd.py b/packages/cli/src/repowise/cli/commands/dead_code_cmd.py index ce4d7fc..e196cfd 100644 --- a/packages/cli/src/repowise/cli/commands/dead_code_cmd.py +++ b/packages/cli/src/repowise/cli/commands/dead_code_cmd.py @@ -30,12 +30,40 @@ type=click.Choice(["table", "json", "md"]), help="Output format.", ) +@click.option( + "--include-internals/--no-include-internals", + default=False, + help="Detect unused private/internal symbols (higher false-positive rate, off by default).", +) +@click.option( + "--include-zombie-packages/--no-include-zombie-packages", + default=True, + help="Detect monorepo packages with no external importers (on by default).", +) +@click.option( + "--no-unreachable", + "no_unreachable", + is_flag=True, + default=False, + help="Skip detection of unreachable files (in_degree=0).", +) +@click.option( + "--no-unused-exports", + "no_unused_exports", + is_flag=True, + default=False, + help="Skip detection of unused public exports.", +) def dead_code_command( path: str | None, min_confidence: float, safe_only: bool, kind: str | None, fmt: str, + include_internals: bool, + include_zombie_packages: bool, + no_unreachable: bool, + no_unused_exports: bool, ) -> None: """Detect dead and unused code.""" from pathlib import Path as PathlibPath @@ -74,9 +102,15 @@ def dead_code_command( pass # Analyze - config = {"min_confidence": min_confidence} + config: dict = { + "min_confidence": min_confidence, + "detect_unused_internals": include_internals, + "detect_zombie_packages": include_zombie_packages, + "detect_unreachable_files": not no_unreachable, + "detect_unused_exports": not no_unused_exports, + } if kind: - # Enable only the requested kind + # --kind overrides the individual detection flags to focus on one type config["detect_unreachable_files"] = kind == "unreachable_file" config["detect_unused_exports"] = kind == "unused_export" config["detect_unused_internals"] = kind == "unused_internal" diff --git a/packages/cli/src/repowise/cli/commands/doctor_cmd.py b/packages/cli/src/repowise/cli/commands/doctor_cmd.py index 0416eb6..bf25b01 100644 --- a/packages/cli/src/repowise/cli/commands/doctor_cmd.py +++ b/packages/cli/src/repowise/cli/commands/doctor_cmd.py @@ -226,6 +226,77 @@ async def _check_stores(): except Exception: checks.append(_check("Store consistency", True, "Could not check")) + # 10. AtomicStorageCoordinator drift check + coord_drift: float | None = None + coord_sql_pages: int | None = None + coord_vector_count: int | None = None + coord_graph_nodes: int | None = None + if db_ok: + try: + + async def _check_coordinator(): + from repowise.core.persistence import ( + create_engine, + create_session_factory, + get_session, + ) + from repowise.core.persistence.coordinator import AtomicStorageCoordinator + from repowise.core.persistence.vector_store import LanceDBVectorStore + from repowise.core.providers.embedding.base import MockEmbedder + + url = get_db_url_for_repo(repo_path) + engine = create_engine(url) + sf = create_session_factory(engine) + + vector_store = None + lance_dir = repowise_dir / "lancedb" + if lance_dir.exists(): + try: + embedder = MockEmbedder() + vector_store = LanceDBVectorStore(str(lance_dir), embedder=embedder) + except Exception: + pass + + async with get_session(sf) as session: + coord = AtomicStorageCoordinator( + session, graph_builder=None, vector_store=vector_store + ) + result = await coord.health_check() + + if vector_store is not None: + try: + await vector_store.close() + except Exception: + pass + await engine.dispose() + return result + + coord_result = run_async(_check_coordinator()) + coord_sql_pages = coord_result.get("sql_pages") + coord_vector_count = coord_result.get("vector_count") + coord_graph_nodes = coord_result.get("graph_nodes") + coord_drift = coord_result.get("drift") + + drift_pct = f"{coord_drift * 100:.1f}%" if coord_drift is not None else "N/A" + if coord_drift is None: + drift_color = "white" + elif coord_drift < 0.05: + drift_color = "green" + elif coord_drift < 0.15: + drift_color = "yellow" + else: + drift_color = "red" + + vec_display = str(coord_vector_count) if coord_vector_count != -1 and coord_vector_count is not None else "unknown" + drift_detail = ( + f"SQL={coord_sql_pages}, Vector={vec_display}, " + f"Drift=[{drift_color}]{drift_pct}[/{drift_color}]" + ) + coord_ok = coord_drift is None or coord_drift < 0.05 + checks.append(_check("Coordinator drift", coord_ok, drift_detail)) + except Exception as exc: + checks.append(_check("Coordinator drift", True, f"Could not check: {exc}")) + # Display table = Table(title="repowise Doctor") table.add_column("Check", style="cyan") diff --git a/packages/cli/src/repowise/cli/commands/init_cmd.py b/packages/cli/src/repowise/cli/commands/init_cmd.py index ba706e8..1ba9bec 100644 --- a/packages/cli/src/repowise/cli/commands/init_cmd.py +++ b/packages/cli/src/repowise/cli/commands/init_cmd.py @@ -411,6 +411,7 @@ def init_command( BarColumn(), TextColumn("{task.completed}/{task.total}"), TimeElapsedColumn(), + TextColumn("[green]${task.fields[cost]:.3f}[/green]"), console=console, ) as progress_bar: callback = RichProgressCallback(progress_bar, console) @@ -520,10 +521,51 @@ def init_command( BarColumn(), TextColumn("{task.completed}/{task.total}"), TimeElapsedColumn(), + TextColumn("[green]${task.fields[cost]:.3f}[/green]"), console=console, ) as gen_progress: gen_callback = RichProgressCallback(gen_progress, console) + # Construct a CostTracker backed by the real DB so every LLM call + # is persisted to the llm_costs table. We need the repo_id from the + # database row that was created/upserted during _persist_result + # (which has not run yet), so we look it up or fall back to in-memory. + from repowise.core.generation.cost_tracker import CostTracker + from repowise.cli.helpers import get_db_url_for_repo + from repowise.core.persistence import ( + create_engine as _create_engine, + create_session_factory as _create_sf, + get_session as _get_session, + init_db as _init_db, + upsert_repository as _upsert_repo, + ) + + async def _make_cost_tracker() -> CostTracker: + url = get_db_url_for_repo(repo_path) + engine = _create_engine(url) + await _init_db(engine) + sf = _create_sf(engine) + async with _get_session(sf) as _sess: + _repo = await _upsert_repo( + _sess, + name=result.repo_name, + local_path=str(repo_path), + ) + _repo_id = _repo.id + # Keep engine alive for the duration of generation — it will be + # disposed by _persist_result's own engine later. + return CostTracker(session_factory=sf, repo_id=_repo_id) + + try: + cost_tracker = run_async(_make_cost_tracker()) + except Exception: + # Fallback to in-memory tracker if DB setup fails + cost_tracker = CostTracker() + + # Attach tracker to provider unconditionally (all providers now + # accept _cost_tracker as an attribute) + provider._cost_tracker = cost_tracker + generated_pages = run_async( run_generation( repo_path=repo_path, @@ -538,6 +580,7 @@ def init_command( concurrency=concurrency, progress=gen_callback, resume=resume, + cost_tracker=cost_tracker, ) ) diff --git a/packages/cli/src/repowise/cli/main.py b/packages/cli/src/repowise/cli/main.py index 02dd069..7fa3650 100644 --- a/packages/cli/src/repowise/cli/main.py +++ b/packages/cli/src/repowise/cli/main.py @@ -6,6 +6,7 @@ from repowise.cli import __version__ from repowise.cli.commands.claude_md_cmd import claude_md_command +from repowise.cli.commands.costs_cmd import costs_command from repowise.cli.commands.dead_code_cmd import dead_code_command from repowise.cli.commands.decision_cmd import decision_group from repowise.cli.commands.doctor_cmd import doctor_command @@ -28,6 +29,7 @@ def cli() -> None: cli.add_command(init_command) cli.add_command(claude_md_command) +cli.add_command(costs_command) cli.add_command(update_command) cli.add_command(dead_code_command) cli.add_command(decision_group) diff --git a/packages/cli/src/repowise/cli/ui.py b/packages/cli/src/repowise/cli/ui.py index 3c500d1..633c052 100644 --- a/packages/cli/src/repowise/cli/ui.py +++ b/packages/cli/src/repowise/cli/ui.py @@ -569,7 +569,9 @@ def on_phase_start(self, phase: str, total: int | None) -> None: if phase in self._tasks: self._progress.update(self._tasks[phase], total=total, visible=True) else: - self._tasks[phase] = self._progress.add_task(label, total=total, visible=True) + self._tasks[phase] = self._progress.add_task( + label, total=total, visible=True, cost=0.0 + ) def on_item_done(self, phase: str) -> None: if phase in self._tasks: @@ -582,3 +584,11 @@ def on_message(self, level: str, text: str) -> None: self._progress.console.print(f" [{style}]{text}[/{style}]") else: self._progress.console.print(f" {text}") + + def set_cost(self, total_cost: float) -> None: + """Update the live cost display on all active progress tasks.""" + for task_id in self._tasks.values(): + try: + self._progress.update(task_id, cost=total_cost) + except Exception: + pass diff --git a/packages/core/alembic/versions/0009_llm_costs.py b/packages/core/alembic/versions/0009_llm_costs.py new file mode 100644 index 0000000..328ec5d --- /dev/null +++ b/packages/core/alembic/versions/0009_llm_costs.py @@ -0,0 +1,54 @@ +"""Add llm_costs table for runtime LLM cost tracking. + +Revision ID: 0009 +Revises: 0008 +Create Date: 2026-04-07 +""" + +from __future__ import annotations + +from collections.abc import Sequence + +import sqlalchemy as sa +from alembic import op + +# revision identifiers +revision: str = "0009" +down_revision: str | None = "0008" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + op.create_table( + "llm_costs", + sa.Column("id", sa.Integer, primary_key=True, autoincrement=True), + sa.Column( + "repository_id", + sa.String(32), + sa.ForeignKey("repositories.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column( + "ts", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("CURRENT_TIMESTAMP"), + ), + sa.Column("model", sa.String(100), nullable=False), + sa.Column("operation", sa.String(50), nullable=False), + sa.Column("input_tokens", sa.Integer, nullable=False), + sa.Column("output_tokens", sa.Integer, nullable=False), + sa.Column("cost_usd", sa.Float, nullable=False), + sa.Column("file_path", sa.String(1024), nullable=True), + ) + op.create_index( + "ix_llm_costs_repository_ts", + "llm_costs", + ["repository_id", "ts"], + ) + + +def downgrade() -> None: + op.drop_index("ix_llm_costs_repository_ts", table_name="llm_costs") + op.drop_table("llm_costs") diff --git a/packages/core/alembic/versions/0010_temporal_hotspot_score.py b/packages/core/alembic/versions/0010_temporal_hotspot_score.py new file mode 100644 index 0000000..b2ce829 --- /dev/null +++ b/packages/core/alembic/versions/0010_temporal_hotspot_score.py @@ -0,0 +1,38 @@ +"""Add temporal_hotspot_score column to git_metadata. + +Stores an exponentially time-decayed churn score used as the primary +signal for hotspot percentile ranking (PERCENT_RANK window function). + +Revision ID: 0010 +Revises: 0009 +Create Date: 2026-04-07 +""" + +from __future__ import annotations + +from collections.abc import Sequence + +import sqlalchemy as sa +from alembic import op + +# revision identifiers +revision: str = "0010" +down_revision: str | None = "0009" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + op.add_column( + "git_metadata", + sa.Column( + "temporal_hotspot_score", + sa.Float, + nullable=True, + server_default="0.0", + ), + ) + + +def downgrade() -> None: + op.drop_column("git_metadata", "temporal_hotspot_score") diff --git a/packages/core/alembic/versions/0011_security_findings.py b/packages/core/alembic/versions/0011_security_findings.py new file mode 100644 index 0000000..86e01f6 --- /dev/null +++ b/packages/core/alembic/versions/0011_security_findings.py @@ -0,0 +1,51 @@ +"""Add security_findings table. + +Stores lightweight security signals detected during file ingestion, +including eval/exec calls, hardcoded secrets, raw SQL, weak hashes, etc. + +Revision ID: 0011 +Revises: 0010 +Create Date: 2026-04-07 +""" + +from __future__ import annotations + +from collections.abc import Sequence + +import sqlalchemy as sa +from alembic import op + +# revision identifiers +revision: str = "0011" +down_revision: str | None = "0010" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + op.create_table( + "security_findings", + sa.Column("id", sa.Integer, primary_key=True, autoincrement=True), + sa.Column("repository_id", sa.String(32), nullable=False), + sa.Column("file_path", sa.String(1024), nullable=False), + sa.Column("kind", sa.String(100), nullable=False), + sa.Column("severity", sa.String(20), nullable=False), + sa.Column("snippet", sa.Text, nullable=True), + sa.Column("line_number", sa.Integer, nullable=True), + sa.Column( + "detected_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.func.now(), + ), + ) + op.create_index( + "ix_security_findings_repo_file", + "security_findings", + ["repository_id", "file_path"], + ) + + +def downgrade() -> None: + op.drop_index("ix_security_findings_repo_file", table_name="security_findings") + op.drop_table("security_findings") diff --git a/packages/core/src/repowise/core/analysis/pr_blast.py b/packages/core/src/repowise/core/analysis/pr_blast.py new file mode 100644 index 0000000..3404714 --- /dev/null +++ b/packages/core/src/repowise/core/analysis/pr_blast.py @@ -0,0 +1,282 @@ +"""PR blast radius analyzer. + +Given a set of changed files, computes: + - Direct risk per file (hotspot * centrality) + - Transitive affected files (graph ancestors up to max_depth) + - Co-change warnings (historical co-change partners NOT in the PR) + - Recommended reviewers (top owners of affected files) + - Test gaps (affected files without a corresponding test file) + - Overall risk score (0-10) + +Reuses existing data: graph_nodes/graph_edges (SQL), git_metadata, and the +co_change_partners_json field stored in git_metadata rows. +""" + +from __future__ import annotations + +import json +import os +from collections import defaultdict +from typing import Any + +from sqlalchemy import select, text +from sqlalchemy.ext.asyncio import AsyncSession + +from repowise.core.persistence.models import GitMetadata, GraphNode + + +class PRBlastRadiusAnalyzer: + """Compute blast radius for a proposed PR given its changed files.""" + + def __init__(self, session: AsyncSession, repo_id: str) -> None: + self._session = session + self._repo_id = repo_id + + async def analyze_files( + self, + changed_files: list[str], + max_depth: int = 3, + ) -> dict: + """Return full blast-radius analysis for the given changed files. + + Parameters + ---------- + changed_files: + Relative file paths that are modified in the PR. + max_depth: + Maximum BFS depth for transitive ancestor lookup. + """ + changed_set = set(changed_files) + + # 1. Per-file direct risk + direct_risks = await self._score_files(changed_files) + + # 2. Transitive affected files + transitive_affected = await self._transitive_affected(changed_files, max_depth) + all_affected_paths = list(changed_set | {e["path"] for e in transitive_affected}) + + # 3. Co-change warnings + cochange_warnings = await self._cochange_warnings(changed_files, changed_set) + + # 4. Recommended reviewers (over all affected files) + recommended_reviewers = await self._recommend_reviewers(all_affected_paths) + + # 5. Test gaps + test_gaps = await self._find_test_gaps(all_affected_paths) + + # 6. Overall risk score (0-10) + overall_risk_score = self._compute_overall_risk(direct_risks, transitive_affected) + + return { + "direct_risks": direct_risks, + "transitive_affected": transitive_affected, + "cochange_warnings": cochange_warnings, + "recommended_reviewers": recommended_reviewers, + "test_gaps": test_gaps, + "overall_risk_score": overall_risk_score, + } + + # ------------------------------------------------------------------ + # Private helpers + # ------------------------------------------------------------------ + + async def _score_files(self, paths: list[str]) -> list[dict]: + """Return direct risk records for each changed file.""" + if not paths: + return [] + + # Fetch git_metadata for all paths in one query + res = await self._session.execute( + select(GitMetadata).where( + GitMetadata.repository_id == self._repo_id, + GitMetadata.file_path.in_(paths), + ) + ) + meta_by_path: dict[str, Any] = {m.file_path: m for m in res.scalars().all()} + + # Fetch graph node pagerank (used as centrality proxy) + node_res = await self._session.execute( + select(GraphNode).where( + GraphNode.repository_id == self._repo_id, + GraphNode.node_id.in_(paths), + ) + ) + node_by_path: dict[str, Any] = {n.node_id: n for n in node_res.scalars().all()} + + results = [] + for path in paths: + meta = meta_by_path.get(path) + node = node_by_path.get(path) + temporal = float(getattr(meta, "temporal_hotspot_score", 0.0) or 0.0) + centrality = float(getattr(node, "pagerank", 0.0) or 0.0) + risk_score = self._score_file(temporal, centrality) + results.append( + { + "path": path, + "risk_score": round(risk_score, 4), + "temporal_hotspot": round(temporal, 4), + "centrality": round(centrality, 6), + } + ) + + results.sort(key=lambda x: -x["risk_score"]) + return results + + @staticmethod + def _score_file(temporal_hotspot_score: float, centrality: float) -> float: + """Compute file-level risk: centrality * (1 + temporal_hotspot_score).""" + return centrality * (1.0 + temporal_hotspot_score) + + async def _transitive_affected( + self, changed_files: list[str], max_depth: int + ) -> list[dict]: + """BFS over reverse graph edges (source_node_id -> target_node_id direction). + + We want files that *import* the changed files (i.e. are affected when a + changed file changes). In graph_edges, an edge means + ``source imports target``, so we look for rows where + ``target_node_id IN (frontier)`` and collect the ``source_node_id`` + values — those are the files that depend on our changed set. + """ + visited: dict[str, int] = {} # path -> depth at which it was first reached + frontier = list(set(changed_files)) + + for depth in range(1, max_depth + 1): + if not frontier: + break + # SQLite / SQLAlchemy compatible IN query via text() + placeholders = ",".join(f":p{i}" for i in range(len(frontier))) + params: dict[str, Any] = {"repo_id": self._repo_id} + params.update({f"p{i}": v for i, v in enumerate(frontier)}) + rows = await self._session.execute( + text( + f"SELECT DISTINCT source_node_id FROM graph_edges " + f"WHERE repository_id = :repo_id " + f"AND target_node_id IN ({placeholders})" + ), + params, + ) + next_frontier = [] + for (src,) in rows: + if src not in visited and src not in set(changed_files): + visited[src] = depth + next_frontier.append(src) + frontier = next_frontier + + return [{"path": p, "depth": d} for p, d in sorted(visited.items(), key=lambda x: x[1])] + + async def _cochange_warnings( + self, changed_files: list[str], changed_set: set[str] + ) -> list[dict]: + """Return co-change partners of changed files that are NOT in the PR.""" + if not changed_files: + return [] + + res = await self._session.execute( + select(GitMetadata).where( + GitMetadata.repository_id == self._repo_id, + GitMetadata.file_path.in_(changed_files), + ) + ) + + warnings = [] + for meta in res.scalars().all(): + partners = json.loads(meta.co_change_partners_json or "[]") + for partner in partners: + partner_path = partner.get("file_path") or partner.get("path") or "" + score = float(partner.get("co_change_count") or partner.get("count") or 0) + if partner_path and partner_path not in changed_set: + warnings.append( + { + "changed": meta.file_path, + "missing_partner": partner_path, + "score": score, + } + ) + + warnings.sort(key=lambda x: -x["score"]) + return warnings + + async def _recommend_reviewers(self, affected_files: list[str]) -> list[dict]: + """Aggregate top owners of affected files; return top 5.""" + if not affected_files: + return [] + + res = await self._session.execute( + select(GitMetadata).where( + GitMetadata.repository_id == self._repo_id, + GitMetadata.file_path.in_(affected_files), + ) + ) + + owner_files: dict[str, list[float]] = defaultdict(list) + for meta in res.scalars().all(): + email = meta.primary_owner_email or "" + pct = float(meta.primary_owner_commit_pct or 0.0) + if email: + owner_files[email].append(pct) + + reviewers = [ + { + "email": email, + "files": len(pcts), + "ownership_pct": round(sum(pcts) / len(pcts), 3) if pcts else 0.0, + } + for email, pcts in owner_files.items() + ] + reviewers.sort(key=lambda x: (-x["files"], -x["ownership_pct"])) + return reviewers[:5] + + async def _find_test_gaps(self, affected_files: list[str]) -> list[str]: + """Return files that lack a corresponding test file. + + Checks graph_nodes for paths matching test_, _test, or + .spec.* patterns. + """ + if not affected_files: + return [] + + node_res = await self._session.execute( + select(GraphNode.node_id).where( + GraphNode.repository_id == self._repo_id, + GraphNode.is_test == True, # noqa: E712 + ) + ) + test_paths = {row[0] for row in node_res.all()} + + gaps = [] + for path in affected_files: + base = os.path.splitext(os.path.basename(path))[0] + ext = os.path.splitext(path)[1].lstrip(".") + has_test = any( + ( + f"test_{base}" in tp + or f"{base}_test" in tp + or f"{base}.spec.{ext}" in tp + or f"{base}.spec." in tp + ) + for tp in test_paths + ) + if not has_test: + gaps.append(path) + + return gaps + + @staticmethod + def _compute_overall_risk( + direct_risks: list[dict], + transitive_affected: list[dict], + ) -> float: + """Compute overall risk score on 0-10 scale.""" + if not direct_risks: + return 0.0 + + avg_direct = sum(r["risk_score"] for r in direct_risks) / len(direct_risks) + max_direct = max(r["risk_score"] for r in direct_risks) + breadth_bonus = min(len(transitive_affected) / 20.0, 1.0) # 0-1 + + # Weighted: 40% avg, 40% max, 20% breadth — scaled to 10 + raw = (0.4 * avg_direct + 0.4 * max_direct + 0.2 * breadth_bonus) + # Normalise pagerank-based scores (typically << 1) to 0-10 + score = min(raw * 100.0, 10.0) + return round(score, 2) diff --git a/packages/core/src/repowise/core/analysis/security_scan.py b/packages/core/src/repowise/core/analysis/security_scan.py new file mode 100644 index 0000000..d9abc66 --- /dev/null +++ b/packages/core/src/repowise/core/analysis/security_scan.py @@ -0,0 +1,128 @@ +"""Lightweight security signal extractor. + +Scans indexed symbols and source for keyword/regex patterns that indicate +authentication, secret handling, raw SQL, dangerous deserialization, etc. + +Stores findings in the security_findings table (see migration 0011). +""" + +from __future__ import annotations + +import re +from datetime import UTC, datetime +from typing import Any + +from sqlalchemy.ext.asyncio import AsyncSession + +# --------------------------------------------------------------------------- +# Pattern registry: (compiled_pattern, kind_label, severity) +# --------------------------------------------------------------------------- +_PATTERNS: list[tuple[re.Pattern, str, str]] = [ + (re.compile(r"eval\s*\("), "eval_call", "high"), + (re.compile(r"exec\s*\("), "exec_call", "high"), + (re.compile(r"pickle\.loads"), "pickle_loads", "high"), + (re.compile(r"subprocess\..*shell\s*=\s*True"), "subprocess_shell_true", "high"), + (re.compile(r"os\.system"), "os_system", "high"), + (re.compile(r"password\s*=\s*['\"]"), "hardcoded_password", "high"), + (re.compile(r"(?:api_?key|secret)\s*=\s*['\"]"), "hardcoded_secret", "high"), + (re.compile(r'f[\'"].*SELECT.*\{.*\}'), "fstring_sql", "med"), + (re.compile(r'\.execute\(\s*[\'\"]\s*SELECT.*\+'), "concat_sql", "med"), + (re.compile(r"verify\s*=\s*False"), "tls_verify_false", "med"), + (re.compile(r"\bmd5\b|\bsha1\b"), "weak_hash", "low"), +] + +# Symbol names that are informational security hotspots +_SYMBOL_KEYWORDS = re.compile( + r"\b(auth|token|password|jwt|session|crypto)\b", re.IGNORECASE +) + + +class SecurityScanner: + """Scan a single file for security signals and persist to the database.""" + + def __init__(self, session: AsyncSession, repo_id: str) -> None: + self._session = session + self._repo_id = repo_id + + async def scan_file( + self, + file_path: str, + source: str, + symbols: list[Any], + ) -> list[dict]: + """Scan *source* text and symbol names; return list of finding dicts. + + Parameters + ---------- + file_path: + Relative path of the file (for reference only; not used in scan). + source: + Full text content of the file. + symbols: + List of symbol objects that have a ``name`` attribute (or similar). + """ + findings: list[dict] = [] + lines = source.splitlines() + + # Line-by-line pattern scan + for lineno, line in enumerate(lines, start=1): + for pattern, kind, severity in _PATTERNS: + if pattern.search(line): + # Trim snippet to keep it concise + snippet = line.strip()[:120] + findings.append( + { + "kind": kind, + "severity": severity, + "snippet": snippet, + "line": lineno, + } + ) + + # Symbol-name scan (informational / low) + for sym in symbols: + name = getattr(sym, "name", "") or getattr(sym, "qualified_name", "") or "" + if name and _SYMBOL_KEYWORDS.search(name): + findings.append( + { + "kind": "security_sensitive_symbol", + "severity": "low", + "snippet": name, + "line": getattr(sym, "start_line", 0) or 0, + } + ) + + return findings + + async def persist(self, file_path: str, findings: list[dict]) -> None: + """Insert security findings into the security_findings table. + + Uses raw INSERT to stay independent of any ORM session state. + Silently skips if the table doesn't exist yet (pre-migration). + """ + from sqlalchemy import text + + if not findings: + return + + now = datetime.now(UTC) + for finding in findings: + try: + await self._session.execute( + text( + "INSERT INTO security_findings " + "(repository_id, file_path, kind, severity, snippet, line_number, detected_at) " + "VALUES (:repo_id, :file_path, :kind, :severity, :snippet, :line, :detected_at)" + ), + { + "repo_id": self._repo_id, + "file_path": file_path, + "kind": finding["kind"], + "severity": finding["severity"], + "snippet": finding.get("snippet", ""), + "line": finding.get("line", 0), + "detected_at": now, + }, + ) + except Exception: # noqa: BLE001 — table may not exist pre-migration + break diff --git a/packages/core/src/repowise/core/generation/cost_tracker.py b/packages/core/src/repowise/core/generation/cost_tracker.py new file mode 100644 index 0000000..542c131 --- /dev/null +++ b/packages/core/src/repowise/core/generation/cost_tracker.py @@ -0,0 +1,264 @@ +"""Runtime LLM cost tracking for repowise. + +Tracks token usage and cost per session, and optionally persists rows to +the ``llm_costs`` table for historical reporting via ``repowise costs``. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any + +import structlog + +log = structlog.get_logger(__name__) + +# --------------------------------------------------------------------------- +# Pricing table — USD per 1 million tokens +# --------------------------------------------------------------------------- + +_PRICING: dict[str, dict[str, float]] = { + # Anthropic + "claude-opus-4-6": {"input": 15.0, "output": 75.0}, + "claude-sonnet-4-6": {"input": 3.0, "output": 15.0}, + "claude-haiku-4-5": {"input": 0.8, "output": 4.0}, + "claude-3-5-sonnet-20241022": {"input": 3.0, "output": 15.0}, + # OpenAI + "gpt-4o": {"input": 2.5, "output": 10.0}, + "gpt-4o-mini": {"input": 0.15, "output": 0.6}, + # Google Gemini + "gemini-2.0-flash": {"input": 0.075, "output": 0.3}, + "gemini-2.5-flash": {"input": 0.30, "output": 2.50}, + "gemini-2.5-pro": {"input": 1.25, "output": 10.0}, + "gemini-1.5-flash": {"input": 0.075, "output": 0.30}, + "gemini-1.5-flash-lite": {"input": 0.075, "output": 0.30}, + "gemini-1.5-pro": {"input": 1.25, "output": 5.0}, + # Gemini preview / experimental models + "gemini-3.1-flash-lite-preview": {"input": 0.075, "output": 0.30}, + "gemini-3-flash-preview": {"input": 0.075, "output": 0.30}, +} + +_FALLBACK_PRICING: dict[str, float] = {"input": 3.0, "output": 15.0} + +# Track which unknown models we've already warned about (per-process) +_warned_models: set[str] = set() + + +def _get_pricing(model: str) -> dict[str, float]: + """Return pricing for *model*, falling back and warning if unknown.""" + if model in _PRICING: + return _PRICING[model] + if model not in _warned_models: + log.warning("cost_tracker.unknown_model", model=model, fallback=_FALLBACK_PRICING) + _warned_models.add(model) + return _FALLBACK_PRICING + + +# --------------------------------------------------------------------------- +# CostTracker +# --------------------------------------------------------------------------- + + +class CostTracker: + """Tracks LLM token usage and cost for a session. + + Optionally persists each call to the ``llm_costs`` table when a + *session_factory* (async SQLAlchemy sessionmaker) is supplied. + + Parameters + ---------- + session_factory: + Async SQLAlchemy sessionmaker. When ``None``, only in-memory + tracking is performed. + repo_id: + Repository primary key to associate with persisted rows. + """ + + def __init__( + self, + session_factory: Any | None = None, + repo_id: str | None = None, + ) -> None: + self._session_factory = session_factory + self._repo_id = repo_id + self._session_cost: float = 0.0 + self._session_tokens: int = 0 + + # ------------------------------------------------------------------ + # Properties + # ------------------------------------------------------------------ + + @property + def session_cost(self) -> float: + """Cumulative USD cost for this tracker instance.""" + return self._session_cost + + @property + def session_tokens(self) -> int: + """Cumulative tokens (input + output) for this tracker instance.""" + return self._session_tokens + + # ------------------------------------------------------------------ + # Recording + # ------------------------------------------------------------------ + + async def record( + self, + model: str, + input_tokens: int, + output_tokens: int, + operation: str, + file_path: str | None = None, + ) -> float: + """Record a single LLM call and return its cost in USD. + + Parameters + ---------- + model: + Model identifier, e.g. ``"claude-sonnet-4-6"``. + input_tokens: + Number of input/prompt tokens consumed. + output_tokens: + Number of output/completion tokens consumed. + operation: + Logical operation label, e.g. ``"doc_generation"`` or + ``"embedding"``. + file_path: + Source file being processed, if available. + + Returns + ------- + float + Cost in USD for this call. + """ + pricing = _get_pricing(model) + cost = (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1_000_000 + + self._session_cost += cost + self._session_tokens += input_tokens + output_tokens + + log.debug( + "cost_tracker.record", + model=model, + input_tokens=input_tokens, + output_tokens=output_tokens, + cost_usd=cost, + operation=operation, + file_path=file_path, + ) + + if self._session_factory is not None and self._repo_id is not None: + await self._persist( + model=model, + input_tokens=input_tokens, + output_tokens=output_tokens, + cost_usd=cost, + operation=operation, + file_path=file_path, + ) + + return cost + + async def _persist( + self, + *, + model: str, + input_tokens: int, + output_tokens: int, + cost_usd: float, + operation: str, + file_path: str | None, + ) -> None: + """Write a row to the ``llm_costs`` table.""" + try: + from repowise.core.persistence.models import LlmCost + from repowise.core.persistence import get_session + + async with get_session(self._session_factory) as session: + row = LlmCost( + repository_id=self._repo_id, + model=model, + operation=operation, + input_tokens=input_tokens, + output_tokens=output_tokens, + cost_usd=cost_usd, + file_path=file_path, + ) + session.add(row) + await session.commit() + except Exception as exc: + log.warning("cost_tracker.persist_failed", error=str(exc)) + + # ------------------------------------------------------------------ + # Querying + # ------------------------------------------------------------------ + + async def totals( + self, + since: datetime | None = None, + group_by: str = "operation", + ) -> list[dict]: + """Query aggregate cost totals from the database. + + Parameters + ---------- + since: + Only include rows whose ``ts`` is on or after this datetime. + group_by: + Grouping dimension: ``"operation"``, ``"model"``, or ``"day"``. + + Returns + ------- + list[dict] + Each dict has keys: ``group``, ``calls``, ``input_tokens``, + ``output_tokens``, ``cost_usd``. + """ + if self._session_factory is None or self._repo_id is None: + return [] + + try: + import sqlalchemy as sa + from repowise.core.persistence.models import LlmCost + from repowise.core.persistence import get_session + + async with get_session(self._session_factory) as session: + if group_by == "model": + group_col = LlmCost.model + elif group_by == "day": + # SQLite strftime; works for Postgres too with cast + group_col = sa.func.strftime("%Y-%m-%d", LlmCost.ts) + else: + group_col = LlmCost.operation + + stmt = ( + sa.select( + group_col.label("group"), + sa.func.count().label("calls"), + sa.func.sum(LlmCost.input_tokens).label("input_tokens"), + sa.func.sum(LlmCost.output_tokens).label("output_tokens"), + sa.func.sum(LlmCost.cost_usd).label("cost_usd"), + ) + .where(LlmCost.repository_id == self._repo_id) + .group_by(group_col) + .order_by(sa.func.sum(LlmCost.cost_usd).desc()) + ) + + if since is not None: + stmt = stmt.where(LlmCost.ts >= since) + + result = await session.execute(stmt) + rows = result.fetchall() + + return [ + { + "group": row.group, + "calls": row.calls, + "input_tokens": row.input_tokens or 0, + "output_tokens": row.output_tokens or 0, + "cost_usd": row.cost_usd or 0.0, + } + for row in rows + ] + except Exception as exc: + log.warning("cost_tracker.totals_failed", error=str(exc)) + return [] diff --git a/packages/core/src/repowise/core/generation/page_generator.py b/packages/core/src/repowise/core/generation/page_generator.py index 84843cb..4f0c9d3 100644 --- a/packages/core/src/repowise/core/generation/page_generator.py +++ b/packages/core/src/repowise/core/generation/page_generator.py @@ -482,6 +482,7 @@ async def guarded_named(page_id: str, coro: Any) -> Any: # Embed page for RAG (B1) if self._vector_store is not None and isinstance(result, GeneratedPage): try: + page_summary = _extract_summary(result.content) await self._vector_store.embed_and_upsert( result.page_id, result.content, @@ -489,6 +490,7 @@ async def guarded_named(page_id: str, coro: Any) -> Any: "page_type": result.page_type, "target_path": result.target_path, "content": result.content[:600], + "summary": page_summary, }, ) except Exception as e: @@ -668,10 +670,70 @@ async def guarded_named(page_id: str, coro: Any) -> Any: # Context is assembled for ALL code files (module pages need it). # Pages are generated only for files that cross the significance bar. # page_summaries from level 0+1 are available here (B2). + # + # Topo-sort: process leaves (no internal out-edges) before roots so that + # dependency summaries are available when assembling dependents' contexts. + # Falls back to existing priority order if networkx is unavailable or graph + # has cycles. + code_file_paths = [p.file_info.path for p in code_files] + try: + import networkx as nx # type: ignore[import] + + # Build a subgraph of just the code files we are about to generate + code_file_set = set(code_file_paths) + dag = nx.DiGraph() + dag.add_nodes_from(code_file_paths) + for path_ in code_file_paths: + if path_ in graph: + for succ in graph.successors(path_): + if succ in code_file_set: + dag.add_edge(path_, succ) # path_ depends on succ + + if nx.is_directed_acyclic_graph(dag): + # topological_sort yields nodes in an order where for each edge u→v, + # u comes before v — i.e. dependents before dependencies. + # We want leaves (dependencies) first, so reverse the order. + topo_order = list(reversed(list(nx.topological_sort(dag)))) + else: + # Cycle present: condense SCCs, topo-sort condensation, then expand. + condensation = nx.condensation(dag) + topo_order_scc = list(reversed(list(nx.topological_sort(condensation)))) + scc_members: dict[int, list[str]] = { + n: list(condensation.nodes[n]["members"]) for n in condensation.nodes + } + topo_order = [ + node for scc_id in topo_order_scc for node in scc_members[scc_id] + ] + + # Preserve priority ordering within the topo-sort by mapping paths to + # their original priority index. + priority_index = {p: i for i, p in enumerate(code_file_paths)} + topo_order = [p for p in topo_order if p in priority_index] + # Re-sort code_files to match topo_order + path_to_parsed = {p.file_info.path: p for p in code_files} + code_files = [path_to_parsed[p] for p in topo_order if p in path_to_parsed] + except Exception: + pass # Keep existing priority order on any failure + file_page_contexts: dict[str, FilePageContext] = {} level2_coros: list[tuple[str, Any]] = [] for p in code_files: + # Pre-fetch dependency summaries from vector store for deps not yet + # in the completed_page_summaries accumulator (e.g. from prior runs). + if self._vector_store is not None: + path_ = p.file_info.path + out_edges = list(graph.successors(path_)) if path_ in graph else [] + internal_deps = [e for e in out_edges if not e.startswith("external:")] + for dep in internal_deps: + if dep not in completed_page_summaries: + try: + result = await self._vector_store.get_page_summary_by_path(dep) + if result and result.get("summary"): + completed_page_summaries[dep] = result["summary"] + except Exception: + pass # Non-fatal — dep context is optional + ctx = self._assembler.assemble_file_page( p, graph, diff --git a/packages/core/src/repowise/core/generation/templates/file_page.j2 b/packages/core/src/repowise/core/generation/templates/file_page.j2 index 2a90e43..8206d8f 100644 --- a/packages/core/src/repowise/core/generation/templates/file_page.j2 +++ b/packages/core/src/repowise/core/generation/templates/file_page.j2 @@ -51,6 +51,15 @@ Module docstring: {{ ctx.docstring }} {% endfor %} {% endif %} +{% if ctx.dependency_summaries %} +## Dependency Context +The following are summaries of files that this file imports. Use these to understand how the imports are intended to be used: +{% for dep_path, dep_summary in ctx.dependency_summaries.items() %} +### `{{ dep_path }}` +{{ dep_summary }} +{% endfor %} +{% endif %} + {% if ctx.file_source_snippet %} ## Source Snippet ```{{ ctx.language }} diff --git a/packages/core/src/repowise/core/ingestion/dynamic_hints/__init__.py b/packages/core/src/repowise/core/ingestion/dynamic_hints/__init__.py new file mode 100644 index 0000000..95af349 --- /dev/null +++ b/packages/core/src/repowise/core/ingestion/dynamic_hints/__init__.py @@ -0,0 +1,6 @@ +from __future__ import annotations + +from .base import DynamicEdge, DynamicHintExtractor +from .registry import HintRegistry + +__all__ = ["HintRegistry", "DynamicEdge", "DynamicHintExtractor"] diff --git a/packages/core/src/repowise/core/ingestion/dynamic_hints/base.py b/packages/core/src/repowise/core/ingestion/dynamic_hints/base.py new file mode 100644 index 0000000..25f2541 --- /dev/null +++ b/packages/core/src/repowise/core/ingestion/dynamic_hints/base.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from pathlib import Path + + +@dataclass +class DynamicEdge: + source: str # repo-relative path + target: str # repo-relative path + edge_type: str # "dynamic_uses" | "dynamic_imports" | "url_route" + hint_source: str # extractor name + weight: float = 1.0 + + +class DynamicHintExtractor(ABC): + name: str + + @abstractmethod + def extract(self, repo_root: Path) -> list[DynamicEdge]: ... diff --git a/packages/core/src/repowise/core/ingestion/dynamic_hints/django.py b/packages/core/src/repowise/core/ingestion/dynamic_hints/django.py new file mode 100644 index 0000000..6d1e4a4 --- /dev/null +++ b/packages/core/src/repowise/core/ingestion/dynamic_hints/django.py @@ -0,0 +1,152 @@ +from __future__ import annotations + +import ast +import re +from pathlib import Path + +from .base import DynamicEdge, DynamicHintExtractor + + +def _app_to_path(app: str, repo_root: Path) -> str | None: + """Attempt to resolve a dotted app name to an __init__.py under repo_root.""" + # Try direct directory: myapp/__init__.py + direct = repo_root / app / "__init__.py" + if direct.exists(): + return str(direct.relative_to(repo_root).as_posix()) + # Try dotted path: myapp.sub → myapp/sub/__init__.py + dotted = app.replace(".", "/") + "/__init__.py" + dotted_path = repo_root / dotted + if dotted_path.exists(): + return str(dotted_path.relative_to(repo_root).as_posix()) + return None + + +def _module_to_path(module: str, repo_root: Path) -> str | None: + """Attempt to resolve a dotted module string to a .py file under repo_root.""" + as_path = module.replace(".", "/") + # Try as a .py file directly + candidate = repo_root / (as_path + ".py") + if candidate.exists(): + return str(candidate.relative_to(repo_root).as_posix()) + # Try as __init__.py inside a package + candidate = repo_root / as_path / "__init__.py" + if candidate.exists(): + return str(candidate.relative_to(repo_root).as_posix()) + return None + + +def _extract_string_list(node: ast.expr) -> list[str]: + """Extract string literals from an ast.List node.""" + results: list[str] = [] + if not isinstance(node, ast.List): + return results + for elt in node.elts: + if isinstance(elt, ast.Constant) and isinstance(elt.value, str): + results.append(elt.value) + return results + + +def _extract_string_value(node: ast.expr) -> str | None: + """Extract a string literal value from a node.""" + if isinstance(node, ast.Constant) and isinstance(node.value, str): + return node.value + return None + + +class DjangoDynamicHints(DynamicHintExtractor): + name = "django_settings" + + def extract(self, repo_root: Path) -> list[DynamicEdge]: + edges: list[DynamicEdge] = [] + edges.extend(self._scan_settings(repo_root)) + edges.extend(self._scan_urls(repo_root)) + return edges + + def _scan_settings(self, repo_root: Path) -> list[DynamicEdge]: + edges: list[DynamicEdge] = [] + + # Collect all settings files + settings_files: list[Path] = list(repo_root.rglob("settings.py")) + for settings_dir in repo_root.rglob("settings"): + if settings_dir.is_dir(): + settings_files.extend(settings_dir.glob("*.py")) + + for settings_file in settings_files: + try: + source = settings_file.read_text(encoding="utf-8", errors="ignore") + tree = ast.parse(source, filename=str(settings_file)) + except Exception: + continue + + try: + rel_settings = settings_file.relative_to(repo_root).as_posix() + except ValueError: + continue + + for node in ast.walk(tree): + if not isinstance(node, ast.Assign): + continue + for target in node.targets: + if not (isinstance(target, ast.Name)): + continue + name = target.id + + if name == "INSTALLED_APPS": + for app in _extract_string_list(node.value): + resolved = _app_to_path(app, repo_root) + if resolved: + edges.append(DynamicEdge( + source=rel_settings, + target=resolved, + edge_type="dynamic_imports", + hint_source=self.name, + )) + + elif name == "ROOT_URLCONF": + module = _extract_string_value(node.value) + if module: + resolved = _module_to_path(module, repo_root) + if resolved: + edges.append(DynamicEdge( + source=rel_settings, + target=resolved, + edge_type="dynamic_imports", + hint_source=self.name, + )) + + elif name == "MIDDLEWARE": + for middleware in _extract_string_list(node.value): + resolved = _module_to_path(middleware, repo_root) + if resolved: + edges.append(DynamicEdge( + source=rel_settings, + target=resolved, + edge_type="dynamic_imports", + hint_source=self.name, + )) + + return edges + + def _scan_urls(self, repo_root: Path) -> list[DynamicEdge]: + edges: list[DynamicEdge] = [] + include_re = re.compile(r"""include\(\s*['\"]([\w\.]+)['\"]""") + + for urls_file in repo_root.rglob("urls.py"): + try: + source = urls_file.read_text(encoding="utf-8", errors="ignore") + rel_urls = urls_file.relative_to(repo_root).as_posix() + except Exception: + continue + + for match in include_re.finditer(source): + module = match.group(1) + resolved = _module_to_path(module, repo_root) + if resolved: + edges.append(DynamicEdge( + source=rel_urls, + target=resolved, + edge_type="url_route", + hint_source=self.name, + )) + + return edges diff --git a/packages/core/src/repowise/core/ingestion/dynamic_hints/node.py b/packages/core/src/repowise/core/ingestion/dynamic_hints/node.py new file mode 100644 index 0000000..62e6e11 --- /dev/null +++ b/packages/core/src/repowise/core/ingestion/dynamic_hints/node.py @@ -0,0 +1,136 @@ +from __future__ import annotations + +import json +import re +from pathlib import Path +from typing import Any + +from .base import DynamicEdge, DynamicHintExtractor + + +def _json_loads_lenient(text: str) -> Any: + """Try json.loads; on failure, strip trailing commas and retry.""" + try: + return json.loads(text) + except json.JSONDecodeError: + cleaned = re.sub(r",\s*([}\]])", r"\1", text) + return json.loads(cleaned) + + +def _collect_export_strings(obj: Any) -> list[str]: + """Recursively collect string values from an exports object.""" + results: list[str] = [] + if isinstance(obj, str): + results.append(obj) + elif isinstance(obj, dict): + for v in obj.values(): + results.extend(_collect_export_strings(v)) + elif isinstance(obj, list): + for item in obj: + results.extend(_collect_export_strings(item)) + return results + + +class NodeDynamicHints(DynamicHintExtractor): + name = "node_package" + + def extract(self, repo_root: Path) -> list[DynamicEdge]: + edges: list[DynamicEdge] = [] + edges.extend(self._scan_package_json(repo_root)) + edges.extend(self._scan_tsconfig(repo_root)) + return edges + + def _scan_package_json(self, repo_root: Path) -> list[DynamicEdge]: + edges: list[DynamicEdge] = [] + + for pkg_file in repo_root.rglob("package.json"): + # Skip node_modules + if "node_modules" in pkg_file.parts: + continue + try: + text = pkg_file.read_text(encoding="utf-8", errors="ignore") + data = json.loads(text) + rel_pkg = pkg_file.relative_to(repo_root).as_posix() + pkg_dir = pkg_file.parent + except Exception: + continue + + # Collect entry point fields and exports strings + candidates: list[str] = [] + for field in ("main", "module", "browser"): + val = data.get(field) + if isinstance(val, str): + candidates.append(val) + + exports = data.get("exports") + if exports is not None: + candidates.extend(_collect_export_strings(exports)) + + for candidate in candidates: + if not candidate.startswith("."): + # Only resolve relative paths + continue + resolved = (pkg_dir / candidate).resolve() + try: + rel_resolved = resolved.relative_to(repo_root.resolve()).as_posix() + except ValueError: + continue + if resolved.exists(): + edges.append(DynamicEdge( + source=rel_pkg, + target=rel_resolved, + edge_type="dynamic_imports", + hint_source=self.name, + )) + + return edges + + def _scan_tsconfig(self, repo_root: Path) -> list[DynamicEdge]: + edges: list[DynamicEdge] = [] + + for tsconfig in repo_root.rglob("tsconfig*.json"): + if "node_modules" in tsconfig.parts: + continue + try: + text = tsconfig.read_text(encoding="utf-8", errors="ignore") + data = _json_loads_lenient(text) + rel_tsconfig = tsconfig.relative_to(repo_root).as_posix() + tsconfig_dir = tsconfig.parent + except Exception: + continue + + compiler_options = data.get("compilerOptions", {}) + if not isinstance(compiler_options, dict): + continue + + paths = compiler_options.get("paths") + if not isinstance(paths, dict): + continue + + base_url = compiler_options.get("baseUrl", ".") + base_dir = (tsconfig_dir / base_url).resolve() + + for _alias, targets in paths.items(): + if not isinstance(targets, list): + continue + for pattern in targets: + if not isinstance(pattern, str): + continue + # Drop trailing /* from glob patterns + clean = pattern.rstrip("/*").rstrip("/") + if not clean: + continue + resolved = (base_dir / clean).resolve() + try: + rel_resolved = resolved.relative_to(repo_root.resolve()).as_posix() + except ValueError: + continue + if resolved.exists(): + edges.append(DynamicEdge( + source=rel_tsconfig, + target=rel_resolved, + edge_type="dynamic_imports", + hint_source=self.name, + )) + + return edges diff --git a/packages/core/src/repowise/core/ingestion/dynamic_hints/pytest_hints.py b/packages/core/src/repowise/core/ingestion/dynamic_hints/pytest_hints.py new file mode 100644 index 0000000..4585fcd --- /dev/null +++ b/packages/core/src/repowise/core/ingestion/dynamic_hints/pytest_hints.py @@ -0,0 +1,103 @@ +from __future__ import annotations + +import ast +from pathlib import Path + +from .base import DynamicEdge, DynamicHintExtractor + + +def _get_fixture_names(tree: ast.AST) -> set[str]: + """Walk an AST and return the names of all @pytest.fixture / @fixture decorated functions.""" + names: set[str] = [] + for node in ast.walk(tree): + if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): + continue + for decorator in node.decorator_list: + if _is_fixture_decorator(decorator): + names.append(node.name) + break + return set(names) + + +def _is_fixture_decorator(decorator: ast.expr) -> bool: + """Return True if the decorator is @fixture or @pytest.fixture (with or without call).""" + # @fixture or @pytest.fixture + if isinstance(decorator, ast.Name) and decorator.id == "fixture": + return True + if isinstance(decorator, ast.Attribute) and decorator.attr == "fixture": + return True + # @fixture(...) or @pytest.fixture(...) + if isinstance(decorator, ast.Call): + return _is_fixture_decorator(decorator.func) + return False + + +def _get_test_function_params(tree: ast.AST) -> set[str]: + """Return all parameter names of test_* functions in the AST.""" + params: set[str] = set() + for node in ast.walk(tree): + if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): + continue + if not node.name.startswith("test_"): + continue + for arg in node.args.args + node.args.posonlyargs + node.args.kwonlyargs: + params.add(arg.arg) + if node.args.vararg: + params.add(node.args.vararg.arg) + if node.args.kwarg: + params.add(node.args.kwarg.arg) + return params + + +class PytestDynamicHints(DynamicHintExtractor): + name = "pytest_conftest" + + def extract(self, repo_root: Path) -> list[DynamicEdge]: + edges: list[DynamicEdge] = [] + + for conftest in repo_root.rglob("conftest.py"): + try: + source = conftest.read_text(encoding="utf-8", errors="ignore") + tree = ast.parse(source, filename=str(conftest)) + rel_conftest = conftest.relative_to(repo_root).as_posix() + except Exception: + continue + + fixture_names = _get_fixture_names(tree) + if not fixture_names: + continue + + conftest_dir = conftest.parent + seen_targets: set[str] = set() + + # Find all test files under the conftest's parent directory + for pattern in ("test_*.py", "*_test.py"): + for test_file in conftest_dir.rglob(pattern): + if test_file == conftest: + continue + try: + rel_test = test_file.relative_to(repo_root).as_posix() + except ValueError: + continue + + if rel_test in seen_targets: + continue + + try: + test_source = test_file.read_text(encoding="utf-8", errors="ignore") + test_tree = ast.parse(test_source, filename=str(test_file)) + except Exception: + continue + + # Check if any test function uses a fixture from this conftest + test_params = _get_test_function_params(test_tree) + if test_params & fixture_names: + seen_targets.add(rel_test) + edges.append(DynamicEdge( + source=rel_conftest, + target=rel_test, + edge_type="dynamic_uses", + hint_source=self.name, + )) + + return edges diff --git a/packages/core/src/repowise/core/ingestion/dynamic_hints/registry.py b/packages/core/src/repowise/core/ingestion/dynamic_hints/registry.py new file mode 100644 index 0000000..16b3750 --- /dev/null +++ b/packages/core/src/repowise/core/ingestion/dynamic_hints/registry.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from pathlib import Path + +import structlog + +from .base import DynamicEdge, DynamicHintExtractor +from .django import DjangoDynamicHints +from .pytest_hints import PytestDynamicHints +from .node import NodeDynamicHints + +log = structlog.get_logger(__name__) + + +class HintRegistry: + def __init__(self, extractors: list[DynamicHintExtractor] | None = None) -> None: + self._extractors = extractors or [ + DjangoDynamicHints(), + PytestDynamicHints(), + NodeDynamicHints(), + ] + + def extract_all(self, repo_root: Path) -> list[DynamicEdge]: + edges: list[DynamicEdge] = [] + for ex in self._extractors: + try: + got = ex.extract(repo_root) + edges.extend(got) + log.debug("dynamic_hints", extractor=ex.name, count=len(got)) + except Exception as e: + log.warning("dynamic_hints_failed", extractor=ex.name, error=str(e)) + return edges diff --git a/packages/core/src/repowise/core/ingestion/git_indexer.py b/packages/core/src/repowise/core/ingestion/git_indexer.py index db4a643..b1875cb 100644 --- a/packages/core/src/repowise/core/ingestion/git_indexer.py +++ b/packages/core/src/repowise/core/ingestion/git_indexer.py @@ -100,6 +100,9 @@ def _quiet_del(self: Any) -> None: # Co-change temporal decay: half-life ~125 days (lambda for exp(-t/tau)). _CO_CHANGE_DECAY_TAU: float = 180.0 +# Hotspot temporal decay: half-life for exponentially weighted churn score. +HOTSPOT_HALFLIFE_DAYS: float = 180.0 + # Regex to extract PR/MR numbers from commit messages. # Matches: "#123", "Merge pull request #456", "(#789)", "!42" (GitLab MR) _PR_NUMBER_RE = re.compile(r"(?:pull request |)\#(\d+)|\(#(\d+)\)|!(\d+)") @@ -555,6 +558,8 @@ def _index_file(self, file_path: str, repo: Any) -> dict: # Phase 3 fields "original_path": None, "merge_commit_count_90d": 0, + # Temporal hotspot score (exponentially decayed churn) + "temporal_hotspot_score": 0.0, } try: @@ -672,6 +677,18 @@ def _index_file(self, file_path: str, repo: Any) -> dict: total_churn = meta["lines_added_90d"] + meta["lines_deleted_90d"] meta["avg_commit_size"] = total_churn / c90 if c90 > 0 else 0.0 + # Temporal hotspot score: exponentially decayed per-commit churn. + # Each commit contributes weight * clamped_lines where weight decays + # with a half-life of HOTSPOT_HALFLIFE_DAYS days from now. + _ln2 = math.log(2) + temporal_score = 0.0 + for c in commits: + age_days = max((now.timestamp() - c.ts) / 86400.0, 0.0) + weight = math.exp(-_ln2 * age_days / HOTSPOT_HALFLIFE_DAYS) + lines = min((c.added + c.deleted) / 100.0, 3.0) + temporal_score += weight * lines + meta["temporal_hotspot_score"] = temporal_score + # Contributor count & bus factor meta["contributor_count"] = len(author_counts) total_commits = sum(author_counts.values()) @@ -964,14 +981,21 @@ def _flush_commit() -> None: @staticmethod def _compute_percentiles(metadata_list: list[dict]) -> None: - """Compute churn_percentile and is_hotspot. Mutates in place.""" + """Compute churn_percentile and is_hotspot. Mutates in place. + + Primary sort key is temporal_hotspot_score (exponentially decayed churn); + commit_count_90d is used as a tiebreak, matching the SQL PERCENT_RANK path. + """ if not metadata_list: return - # Sort by commit_count_90d for percentile ranking + # Sort by temporal_hotspot_score (primary) then commit_count_90d (tiebreak) sorted_by_churn = sorted( range(len(metadata_list)), - key=lambda i: metadata_list[i].get("commit_count_90d", 0), + key=lambda i: ( + metadata_list[i].get("temporal_hotspot_score") or 0.0, + metadata_list[i].get("commit_count_90d", 0), + ), ) total = len(metadata_list) diff --git a/packages/core/src/repowise/core/ingestion/graph.py b/packages/core/src/repowise/core/ingestion/graph.py index a003d64..d9d21df 100644 --- a/packages/core/src/repowise/core/ingestion/graph.py +++ b/packages/core/src/repowise/core/ingestion/graph.py @@ -500,6 +500,26 @@ def update_co_change_edges(self, updated_meta: dict, min_count: int = 3) -> None # Re-add co_changes edges self.add_co_change_edges(updated_meta, min_count) + # ------------------------------------------------------------------ + # Dynamic-hint edges + # ------------------------------------------------------------------ + + def add_dynamic_edges(self, edges: list) -> None: + """Add dynamic-hint edges to the graph. Each edge is a DynamicEdge.""" + for e in edges: + if e.source not in self._graph: + continue + if e.target not in self._graph: + # add a stub node so dead-code analysis sees it as reachable + self._graph.add_node(e.target) + self._graph.add_edge( + e.source, + e.target, + edge_type="dynamic", + hint_source=e.hint_source, + weight=e.weight, + ) + # ------------------------------------------------------------------ # Framework-aware synthetic edges # ------------------------------------------------------------------ diff --git a/packages/core/src/repowise/core/persistence/coordinator.py b/packages/core/src/repowise/core/persistence/coordinator.py new file mode 100644 index 0000000..0116ecd --- /dev/null +++ b/packages/core/src/repowise/core/persistence/coordinator.py @@ -0,0 +1,212 @@ +"""Atomic three-store transaction coordinator. + +Buffers writes across SQL (AsyncSession), in-memory graph (GraphBuilder / +NetworkX), and the vector store. Flushes them in order; rolls back on failure. + +Usage: + coord = AtomicStorageCoordinator(session, graph_builder, vector_store) + async with coord.transaction() as txn: + txn.add_sql(some_orm_obj) + txn.add_graph_node("path/file.py", attrs={...}) + txn.add_graph_edge("a.py", "b.py", attrs={...}) + txn.add_vector("page-id", {"path": ..., "summary": ..., "embedding": ...}) + # On normal exit: SQL commit, graph applied, vectors upserted. + # On exception anywhere: SQL rollback, graph nodes/edges removed, vector ids deleted. + +Vector store notes +------------------ +All three stores (InMemoryVectorStore, LanceDBVectorStore, PgVectorStore) share +the same async API: + - upsert: embed_and_upsert(page_id: str, text: str, metadata: dict) -> None + - delete: delete(page_id: str) -> None + - count: __len__() (InMemoryVectorStore only; others unsupported) + +The ``record`` dict passed to ``add_vector`` must contain a ``"text"`` key +(the raw text to embed). All other keys are forwarded as metadata. + +GraphBuilder notes +------------------ +The NetworkX DiGraph is stored as ``GraphBuilder._graph`` (private attribute). +This coordinator accesses it directly via ``getattr(builder, "_graph", None)`` +to avoid triggering a full ``build()`` call. +""" +from __future__ import annotations + +import asyncio +from contextlib import asynccontextmanager +from dataclasses import dataclass, field +from typing import Any, AsyncIterator +import structlog + +log = structlog.get_logger(__name__) + + +@dataclass +class _PendingTransaction: + pending_sql_objects: list[Any] = field(default_factory=list) + pending_graph_nodes: list[tuple[str, dict]] = field(default_factory=list) + pending_graph_edges: list[tuple[str, str, dict]] = field(default_factory=list) + pending_vectors: list[tuple[str, dict]] = field(default_factory=list) # (id, record) + + def add_sql(self, obj: Any) -> None: + self.pending_sql_objects.append(obj) + + def add_graph_node(self, node_id: str, attrs: dict | None = None) -> None: + self.pending_graph_nodes.append((node_id, attrs or {})) + + def add_graph_edge(self, src: str, dst: str, attrs: dict | None = None) -> None: + self.pending_graph_edges.append((src, dst, attrs or {})) + + def add_vector(self, vector_id: str, record: dict) -> None: + """Queue a vector upsert. + + ``record`` must contain a ``"text"`` key with the raw text to embed. + All remaining keys are passed as metadata to the vector store. + """ + self.pending_vectors.append((vector_id, record)) + + +class AtomicStorageCoordinator: + def __init__(self, session, graph_builder=None, vector_store=None) -> None: + self._session = session + self._graph_builder = graph_builder + self._vector_store = vector_store + self._lock = asyncio.Lock() + + @asynccontextmanager + async def transaction(self) -> AsyncIterator[_PendingTransaction]: + txn = _PendingTransaction() + applied_nodes: list[str] = [] + applied_edges: list[tuple[str, str]] = [] + applied_vector_ids: list[str] = [] + async with self._lock: + try: + yield txn + # === FLUSH === + # 1. SQL first (most likely to fail constraints) + for obj in txn.pending_sql_objects: + self._session.add(obj) + await self._session.flush() + + # 2. Graph (in-memory; track for rollback) + # Access _graph directly to avoid triggering a full build() call. + if self._graph_builder is not None: + g = getattr(self._graph_builder, "_graph", None) + if g is not None: + for node_id, attrs in txn.pending_graph_nodes: + existed = node_id in g + g.add_node(node_id, **attrs) + if not existed: + applied_nodes.append(node_id) + for src, dst, attrs in txn.pending_graph_edges: + if not g.has_edge(src, dst): + g.add_edge(src, dst, **attrs) + applied_edges.append((src, dst)) + + # 3. Vector store last + # All stores expose: embed_and_upsert(page_id, text, metadata) async + if self._vector_store is not None and txn.pending_vectors: + for vid, record in txn.pending_vectors: + await _vector_upsert(self._vector_store, vid, record) + applied_vector_ids.append(vid) + + await self._session.commit() + log.debug( + "atomic_txn_committed", + sql=len(txn.pending_sql_objects), + nodes=len(applied_nodes), + edges=len(applied_edges), + vectors=len(applied_vector_ids), + ) + except Exception as e: + log.warning("atomic_txn_failed_rollback", error=str(e)) + # SQL rollback + try: + await self._session.rollback() + except Exception as e2: + log.error("sql_rollback_failed", error=str(e2)) + # Graph rollback + if self._graph_builder is not None: + g = getattr(self._graph_builder, "_graph", None) + if g is not None: + for src, dst in applied_edges: + if g.has_edge(src, dst): + g.remove_edge(src, dst) + for node_id in applied_nodes: + if node_id in g: + g.remove_node(node_id) + # Vector rollback — all stores expose delete(page_id) async + if self._vector_store is not None: + for vid in applied_vector_ids: + try: + await _vector_delete(self._vector_store, vid) + except Exception as e2: + log.error("vector_rollback_failed", id=vid, error=str(e2)) + raise + + async def health_check(self) -> dict: + """Compare counts across stores. Returns drift report.""" + from sqlalchemy import text + report: dict = {"sql_pages": None, "vector_count": None, "graph_nodes": None, "drift": None} + try: + res = await self._session.execute(text("SELECT COUNT(*) FROM wiki_pages")) + report["sql_pages"] = int(res.scalar() or 0) + except Exception as e: + report["sql_pages_error"] = str(e) + if self._graph_builder is not None: + g = getattr(self._graph_builder, "_graph", None) + if g is not None: + report["graph_nodes"] = g.number_of_nodes() + if self._vector_store is not None: + try: + report["vector_count"] = await _vector_count(self._vector_store) + except Exception as e: + report["vector_count_error"] = str(e) + # Compute drift between SQL and vector if both available + if report["sql_pages"] is not None and report["vector_count"] is not None: + denom = max(report["sql_pages"], 1) + report["drift"] = abs(report["sql_pages"] - report["vector_count"]) / denom + return report + + +# --------------------------------------------------------------------------- +# Vector store adapter helpers +# +# All three stores (InMemoryVectorStore, LanceDBVectorStore, PgVectorStore) +# share the same method names: +# upsert: embed_and_upsert(page_id: str, text: str, metadata: dict) -> None (async) +# delete: delete(page_id: str) -> None (async) +# count: __len__() (sync, InMemoryVectorStore only; others return -1) +# --------------------------------------------------------------------------- + +async def _vector_upsert(store, vid: str, record: dict) -> None: + """Call embed_and_upsert on the store. + + ``record`` must contain a ``"text"`` key. All other keys are forwarded + as metadata. Raises ValueError if ``"text"`` is absent. + """ + text = record.get("text") + if text is None: + raise ValueError( + f"_vector_upsert: record for '{vid}' is missing required 'text' key. " + f"Keys present: {list(record.keys())}" + ) + metadata = {k: v for k, v in record.items() if k != "text"} + await store.embed_and_upsert(vid, text, metadata) + + +async def _vector_delete(store, vid: str) -> None: + """Call delete(page_id) on the store.""" + await store.delete(vid) + + +async def _vector_count(store) -> int: + """Return the number of vectors in the store. + + InMemoryVectorStore exposes __len__; LanceDB and PgVector do not have a + cheap count method, so we return -1 for those. + """ + fn = getattr(store, "__len__", None) + if fn is not None: + return int(fn()) + return -1 diff --git a/packages/core/src/repowise/core/persistence/crud.py b/packages/core/src/repowise/core/persistence/crud.py index 00dcf64..577abb8 100644 --- a/packages/core/src/repowise/core/persistence/crud.py +++ b/packages/core/src/repowise/core/persistence/crud.py @@ -18,7 +18,7 @@ from datetime import UTC, datetime from typing import Any -from sqlalchemy import select +from sqlalchemy import select, text from sqlalchemy.ext.asyncio import AsyncSession from .models import ( @@ -709,31 +709,41 @@ async def recompute_git_percentiles( session: AsyncSession, repository_id: str, ) -> int: - """Reload all git_metadata rows and recompute churn_percentile + is_hotspot. + """Recompute churn_percentile + is_hotspot using a SQL PERCENT_RANK window function. Called after incremental updates so that percentile rankings stay fresh without a full ``repowise init``. Returns the number of rows updated. + + Primary ranking signal is temporal_hotspot_score (exponentially decayed churn); + commit_count_90d is the tiebreak. Works on both SQLite (3.25+) and PostgreSQL. """ - result = await session.execute( + # First check how many rows exist so we can return the count without an + # extra query after the UPDATE. + count_result = await session.execute( select(GitMetadata).where(GitMetadata.repository_id == repository_id) ) - rows = result.scalars().all() + rows = count_result.scalars().all() if not rows: return 0 - # Sort by 90-day commit count for percentile ranking - sorted_rows = sorted(rows, key=lambda r: r.commit_count_90d or 0) - total = len(sorted_rows) - - for rank, row in enumerate(sorted_rows): - pct = rank / total if total > 0 else 0.0 - row.churn_percentile = pct - c90 = row.commit_count_90d or 0 - row.is_hotspot = pct >= 0.75 and c90 > 0 - row.updated_at = _now_utc() - + sql = """ +WITH ranked AS ( + SELECT id, PERCENT_RANK() OVER ( + PARTITION BY repository_id + ORDER BY COALESCE(temporal_hotspot_score, 0.0), commit_count_90d + ) AS prank + FROM git_metadata + WHERE repository_id = :repo_id +) +UPDATE git_metadata +SET churn_percentile = (SELECT prank FROM ranked WHERE ranked.id = git_metadata.id), + is_hotspot = ((SELECT prank FROM ranked WHERE ranked.id = git_metadata.id) >= 0.75 + AND git_metadata.commit_count_90d > 0) +WHERE repository_id = :repo_id; +""" + await session.execute(text(sql), {"repo_id": repository_id}) await session.flush() - return total + return len(rows) # --------------------------------------------------------------------------- diff --git a/packages/core/src/repowise/core/persistence/models.py b/packages/core/src/repowise/core/persistence/models.py index 3b28500..1ab2229 100644 --- a/packages/core/src/repowise/core/persistence/models.py +++ b/packages/core/src/repowise/core/persistence/models.py @@ -299,6 +299,9 @@ class GitMetadata(Base): original_path: Mapped[str | None] = mapped_column(Text, nullable=True) merge_commit_count_90d: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + # Temporal hotspot score: exponentially time-decayed churn signal + temporal_hotspot_score: Mapped[float | None] = mapped_column(Float, nullable=True, default=0.0) + created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, default=_now_utc ) @@ -401,6 +404,45 @@ class ChatMessage(Base): ) +class LlmCost(Base): + """A single LLM API call cost record.""" + + __tablename__ = "llm_costs" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + repository_id: Mapped[str] = mapped_column( + String(32), ForeignKey("repositories.id", ondelete="CASCADE"), nullable=False + ) + ts: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, default=_now_utc + ) + model: Mapped[str] = mapped_column(String(100), nullable=False) + operation: Mapped[str] = mapped_column(String(50), nullable=False) + input_tokens: Mapped[int] = mapped_column(Integer, nullable=False) + output_tokens: Mapped[int] = mapped_column(Integer, nullable=False) + cost_usd: Mapped[float] = mapped_column(Float, nullable=False) + file_path: Mapped[str | None] = mapped_column(String(1024), nullable=True) + + +class SecurityFinding(Base): + """A security signal detected during file ingestion.""" + + __tablename__ = "security_findings" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + repository_id: Mapped[str] = mapped_column( + String(32), ForeignKey("repositories.id", ondelete="CASCADE"), nullable=False + ) + file_path: Mapped[str] = mapped_column(String(1024), nullable=False) + kind: Mapped[str] = mapped_column(String(100), nullable=False) + severity: Mapped[str] = mapped_column(String(20), nullable=False) + snippet: Mapped[str | None] = mapped_column(Text, nullable=True) + line_number: Mapped[int | None] = mapped_column(Integer, nullable=True) + detected_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, default=_now_utc + ) + + class DeadCodeFinding(Base): """Dead code finding: unreachable files, unused exports, zombie packages.""" diff --git a/packages/core/src/repowise/core/persistence/vector_store.py b/packages/core/src/repowise/core/persistence/vector_store.py index dce05c0..14e431b 100644 --- a/packages/core/src/repowise/core/persistence/vector_store.py +++ b/packages/core/src/repowise/core/persistence/vector_store.py @@ -76,6 +76,14 @@ async def list_page_ids(self) -> set[str]: """ return set() # default: empty (subclasses should override) + async def get_page_summary_by_path(self, path: str) -> dict | None: + """Return {'summary': str, 'key_exports': list[str]} for a previously-indexed page, or None. + + Used for RAG context injection during doc generation: when generating page B + that imports A, we fetch A's previously-generated summary and feed it to the LLM. + """ + return None # default: no-op (subclasses should override) + # --------------------------------------------------------------------------- # InMemoryVectorStore @@ -145,6 +153,23 @@ async def close(self) -> None: async def list_page_ids(self) -> set[str]: return set(self._store.keys()) + async def get_page_summary_by_path(self, path: str) -> dict | None: + """Return {'summary': str, 'key_exports': list[str]} for a previously-indexed page, or None. + + Used for RAG context injection during doc generation: when generating page B + that imports A, we fetch A's previously-generated summary and feed it to the LLM. + + Implementation note: reads 'summary' from metadata if present (set by the + generation pipeline), else falls back to the first 500 chars of 'content'. + 'key_exports' reads the 'exports' metadata field if present, else []. + """ + for _pid, (_, meta) in self._store.items(): + if meta.get("target_path") == path: + summary = meta.get("summary") or str(meta.get("content", ""))[:500] + key_exports = meta.get("exports") or [] + return {"summary": summary, "key_exports": list(key_exports)} + return None + def __len__(self) -> int: return len(self._store) @@ -294,6 +319,38 @@ async def list_page_ids(self) -> set[str]: rows = await self._table.query().select(["page_id"]).to_list() # type: ignore[union-attr] return {r["page_id"] for r in rows} + async def get_page_summary_by_path(self, path: str) -> dict | None: + """Return {'summary': str, 'key_exports': list[str]} for a previously-indexed page, or None. + + Used for RAG context injection during doc generation: when generating page B + that imports A, we fetch A's previously-generated summary and feed it to the LLM. + + Implementation note: LanceDB stores up to 200 chars of content in 'content_snippet'. + We use that as the summary. 'key_exports' is not stored in the LanceDB schema, so + we return [] — the caller only uses the text summary for prompt injection. + """ + await self._ensure_connected() + if self._table is None: + return None + + safe_path = path.replace("'", "''") + try: + rows = ( + await self._table.query() # type: ignore[union-attr] + .where(f"target_path = '{safe_path}'") + .select(["content_snippet"]) + .limit(1) + .to_list() + ) + except Exception: + return None + + if not rows: + return None + + summary = rows[0].get("content_snippet") or "" + return {"summary": str(summary), "key_exports": []} + # --------------------------------------------------------------------------- # PgVectorStore @@ -390,3 +447,44 @@ async def list_page_ids(self) -> set[str]: sa_text("SELECT id FROM wiki_pages WHERE embedding IS NOT NULL") ) return {r[0] for r in rows.fetchall()} + + async def get_page_summary_by_path(self, path: str) -> dict | None: + """Return {'summary': str, 'key_exports': list[str]} for a previously-indexed page, or None. + + Used for RAG context injection during doc generation: when generating page B + that imports A, we fetch A's previously-generated summary and feed it to the LLM. + + Implementation note: reads the 'content' column (first 500 chars) from the + wiki_pages table matched by target_path. 'key_exports' is derived from the + page's exports if stored in a metadata JSON column; otherwise returns []. + """ + from sqlalchemy.sql import text as sa_text + + async with self._session_factory() as session: + rows = await session.execute( + sa_text( + "SELECT content, metadata FROM wiki_pages " + "WHERE target_path = :path " + "LIMIT 1" + ), + {"path": path}, + ) + row = rows.fetchone() + + if row is None: + return None + + content = str(row[0] or "")[:500] + # Extract key_exports from metadata JSON column if present + key_exports: list[str] = [] + if row[1] and isinstance(row[1], dict): + key_exports = list(row[1].get("exports", [])) + elif row[1] and isinstance(row[1], str): + import json + try: + meta = json.loads(row[1]) + key_exports = list(meta.get("exports", [])) + except (json.JSONDecodeError, AttributeError): + pass + + return {"summary": content, "key_exports": key_exports} diff --git a/packages/core/src/repowise/core/pipeline/orchestrator.py b/packages/core/src/repowise/core/pipeline/orchestrator.py index dfae3d1..a7c390c 100644 --- a/packages/core/src/repowise/core/pipeline/orchestrator.py +++ b/packages/core/src/repowise/core/pipeline/orchestrator.py @@ -15,8 +15,9 @@ from __future__ import annotations import asyncio +import os import time -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from dataclasses import dataclass, field from pathlib import Path from typing import Any @@ -27,6 +28,35 @@ logger = structlog.get_logger(__name__) + +# --------------------------------------------------------------------------- +# Process-pool worker (module-level — must be picklable) +# --------------------------------------------------------------------------- + +# Module-level process-local parser cache (one per worker process). +_WORKER_PARSER: Any = None + + +def _parse_one(path_and_fi_and_bytes: tuple) -> Any: + """Worker function for ProcessPoolExecutor parsing. + + Constructs (or reuses) a process-local ASTParser and parses one file. + Returns a ParsedFile on success, or (abs_path_str, error_str) on failure. + The parser is constructed lazily inside the worker — the ASTParser itself + (which holds compiled tree-sitter Language/Query objects) is never pickled. + Only FileInfo (input) and ParsedFile (output) cross the process boundary; + both are plain dataclasses and therefore picklable. + """ + global _WORKER_PARSER + fi, source = path_and_fi_and_bytes + try: + if _WORKER_PARSER is None: + from repowise.core.ingestion import ASTParser + _WORKER_PARSER = ASTParser() + return _WORKER_PARSER.parse_file(fi, source) + except Exception as exc: + return (fi.abs_path, str(exc)) + # Maximum seconds to spend on decision extraction before giving up. # Large repos with tens of thousands of files can take arbitrarily long. DECISION_EXTRACTION_TIMEOUT_SECS = 300 @@ -111,6 +141,7 @@ async def run_pipeline( test_run: bool = False, resume: bool = False, progress: ProgressCallback | None = None, + cost_tracker: Any | None = None, ) -> PipelineResult: """Run the repowise indexing/analysis/generation pipeline. @@ -156,28 +187,47 @@ async def run_pipeline( commit_depth = max(1, min(commit_depth, 5000)) + # Attach cost tracker to provider if supplied + if cost_tracker is not None and llm_client is not None and hasattr(llm_client, "_cost_tracker"): + llm_client._cost_tracker = cost_tracker + # ---- Phase 1: Ingestion ------------------------------------------------ if progress: progress.on_message("info", "Phase 1: Ingestion") - parsed_files, file_infos, repo_structure, source_map, graph_builder = ( - await _run_ingestion( + # Launch git indexing as a background task immediately — it is independent + # of parsing and graph-build, so the two stages can run concurrently. + # _run_ingestion does: traverse → ProcessPool parse → graph build → dynamic hints. + # _run_git_indexing does: git log → co-change accumulation (I/O bound, own executor). + + async def _git_stage() -> tuple: + return await _run_git_indexing( + repo_path, + commit_depth=commit_depth, + follow_renames=follow_renames, + progress=progress, + ) + + async def _ingestion_stage() -> tuple: + return await _run_ingestion( repo_path, exclude_patterns=exclude_patterns, skip_tests=skip_tests, skip_infra=skip_infra, progress=progress, ) - ) - # Git indexing (runs concurrently with ingestion in the CLI, but here - # we start it after traversal since we're already async) - git_summary, git_metadata_list, git_meta_map = await _run_git_indexing( - repo_path, - commit_depth=commit_depth, - follow_renames=follow_renames, - progress=progress, - ) + ( + parsed_files, + file_infos, + repo_structure, + source_map, + graph_builder, + ), ( + git_summary, + git_metadata_list, + git_meta_map, + ) = await asyncio.gather(_ingestion_stage(), _git_stage()) # Add co-change edges to the graph if git_meta_map: @@ -335,31 +385,77 @@ async def _run_ingestion( if fi.language not in ("dockerfile", "makefile", "terraform", "shell") ] - # Parse (sequential — GraphBuilder is not thread-safe) + # ---- Parse phase: CPU-bound, run in ProcessPoolExecutor ---------------- if progress: progress.on_phase_start("parse", len(file_infos)) - parser = ASTParser() + # Read source bytes up front (I/O, sequential — fast enough; keeps worker + # args small: FileInfo + bytes, both picklable plain dataclasses/bytes). + fi_and_bytes: list[tuple] = [] + for fi in file_infos: + try: + source = Path(fi.abs_path).read_bytes() + fi_and_bytes.append((fi, source)) + except Exception: + if progress: + progress.on_item_done("parse") + parsed_files: list[Any] = [] source_map: dict[str, bytes] = {} graph_builder = GraphBuilder(repo_path=repo_path) - for i, fi in enumerate(file_infos): - try: - source = Path(fi.abs_path).read_bytes() - parsed = parser.parse_file(fi, source) - parsed_files.append(parsed) + loop = asyncio.get_running_loop() + workers = max(1, os.cpu_count() or 4) + + _use_process_pool = True + parse_results: list[Any] = [] + + try: + with ProcessPoolExecutor(max_workers=workers) as pool: + tasks = [ + loop.run_in_executor(pool, _parse_one, item) + for item in fi_and_bytes + ] + # Use as_completed via asyncio.as_completed to report per-file progress. + # We need to preserve (task → fi_and_bytes index) for source_map so we + # wrap tasks in a list and drain with gather instead. + parse_results = await asyncio.gather(*tasks, return_exceptions=True) + except Exception as pool_exc: + logger.warning( + "process_pool_parse_failed_falling_back", + error=str(pool_exc), + ) + _use_process_pool = False + # Fallback: in-process sequential parse + _fallback_parser = ASTParser() + for i, (fi, source) in enumerate(fi_and_bytes): + try: + result = _fallback_parser.parse_file(fi, source) + parse_results.append(result) + except Exception as exc: + parse_results.append((fi.abs_path, str(exc))) + if progress: + progress.on_item_done("parse") + if i % 50 == 49: + await asyncio.sleep(0) + + # Aggregate results into GraphBuilder on the main loop (not thread-safe). + for idx, result in enumerate(parse_results): + fi, source = fi_and_bytes[idx] + if isinstance(result, tuple) and len(result) == 2 and isinstance(result[1], str): + # Error tuple: (abs_path_str, error_str) + logger.debug("parse_error_in_worker", path=result[0], error=result[1]) + elif isinstance(result, Exception): + logger.debug("parse_exception_in_worker", path=fi.abs_path, error=str(result)) + else: + parsed_files.append(result) source_map[fi.path] = source - graph_builder.add_file(parsed) - except Exception: - pass # skip unparseable files - if progress: + graph_builder.add_file(result) + # Report per-file progress if we used the process pool (fallback already reported above). + if _use_process_pool and progress: progress.on_item_done("parse") - # Yield control every 50 files so the event loop stays responsive - if i % 50 == 49: - await asyncio.sleep(0) - # Build graph (CPU-bound — run in thread to keep event loop free) + # ---- Graph build phase ------------------------------------------------- if progress: progress.on_phase_start("graph", 1) await asyncio.to_thread(graph_builder.build) @@ -373,6 +469,17 @@ async def _run_ingestion( except Exception: pass # framework edge detection is best-effort + # ---- Dynamic hints wiring (after static graph is fully built) ---------- + try: + from repowise.core.ingestion.dynamic_hints import HintRegistry + + registry = HintRegistry() + dynamic_edges = await loop.run_in_executor(None, registry.extract_all, repo_path) + graph_builder.add_dynamic_edges(dynamic_edges) + logger.info("dynamic_hints_added", count=len(dynamic_edges)) + except Exception as hints_exc: + logger.warning("dynamic_hints_failed", error=str(hints_exc)) + if progress: progress.on_item_done("graph") @@ -533,6 +640,7 @@ async def run_generation( concurrency: int, progress: ProgressCallback | None, resume: bool = False, + cost_tracker: Any | None = None, ) -> list[Any]: """Run LLM-powered page generation. @@ -547,6 +655,10 @@ async def run_generation( from repowise.core.persistence.vector_store import InMemoryVectorStore from repowise.core.providers.embedding.base import MockEmbedder + # Attach cost tracker to LLM client if available + if cost_tracker is not None and llm_client is not None and hasattr(llm_client, "_cost_tracker"): + llm_client._cost_tracker = cost_tracker + config = GenerationConfig(max_concurrency=concurrency) assembler = ContextAssembler(config) @@ -573,6 +685,9 @@ def on_page_done(page_type: str) -> None: _pages_done += 1 if progress: progress.on_item_done("generation") + # Push live cost update if the callback supports it + if cost_tracker is not None and hasattr(progress, "set_cost"): + progress.set_cost(cost_tracker.session_cost) if progress: progress.on_phase_start("generation", None) diff --git a/packages/core/src/repowise/core/pipeline/persist.py b/packages/core/src/repowise/core/pipeline/persist.py index 78af265..8c702ac 100644 --- a/packages/core/src/repowise/core/pipeline/persist.py +++ b/packages/core/src/repowise/core/pipeline/persist.py @@ -108,6 +108,25 @@ async def persist_pipeline_result( if all_symbols: await batch_upsert_symbols(session, repo_id, all_symbols) + # ---- Security scan ------------------------------------------------------- + # Choice: persist.py (rather than orchestrator.py) because there is already + # a clear per-file loop over parsed_files here, and the instructions ask for + # a minimal, non-invasive addition. The orchestrator parse stage is owned + # by another agent and must not be touched. + try: + from repowise.core.analysis.security_scan import SecurityScanner + + scanner = SecurityScanner(session, repo_id) + for pf in result.parsed_files: + source_text = getattr(pf.file_info, "content", "") or "" + findings = await scanner.scan_file( + pf.file_info.path, source_text, pf.symbols + ) + if findings: + await scanner.persist(pf.file_info.path, findings) + except Exception as _sec_err: # noqa: BLE001 — scanner must never break the pipeline + logger.warning("security_scan_skipped", error=str(_sec_err)) + # ---- Git metadata -------------------------------------------------------- if result.git_metadata_list: await upsert_git_metadata_bulk(session, repo_id, result.git_metadata_list) diff --git a/packages/core/src/repowise/core/providers/llm/anthropic.py b/packages/core/src/repowise/core/providers/llm/anthropic.py index 8d9b0b2..1322100 100644 --- a/packages/core/src/repowise/core/providers/llm/anthropic.py +++ b/packages/core/src/repowise/core/providers/llm/anthropic.py @@ -36,9 +36,12 @@ RateLimitError, ) -from typing import Any, AsyncIterator +from typing import TYPE_CHECKING, Any, AsyncIterator from repowise.core.rate_limiter import RateLimiter +if TYPE_CHECKING: + from repowise.core.generation.cost_tracker import CostTracker + log = structlog.get_logger(__name__) _MAX_RETRIES = 3 @@ -61,6 +64,7 @@ def __init__( api_key: str | None = None, model: str = "claude-sonnet-4-6", rate_limiter: RateLimiter | None = None, + cost_tracker: CostTracker | None = None, ) -> None: resolved_key = api_key or os.environ.get("ANTHROPIC_API_KEY") if not resolved_key: @@ -71,6 +75,7 @@ def __init__( self._client = AsyncAnthropic(api_key=resolved_key) self._model = model self._rate_limiter = rate_limiter + self._cost_tracker = cost_tracker @property def provider_name(self) -> str: @@ -164,6 +169,23 @@ async def _generate_with_retry( cached_tokens=result.cached_tokens, request_id=request_id, ) + + if self._cost_tracker is not None: + import asyncio + + try: + asyncio.get_event_loop().create_task( + self._cost_tracker.record( + model=self._model, + input_tokens=result.input_tokens, + output_tokens=result.output_tokens, + operation="doc_generation", + file_path=None, + ) + ) + except RuntimeError: + pass # No running event loop — skip async record + return result # --- ChatProvider protocol implementation --- diff --git a/packages/core/src/repowise/core/providers/llm/gemini.py b/packages/core/src/repowise/core/providers/llm/gemini.py index 7398c7a..46503dc 100644 --- a/packages/core/src/repowise/core/providers/llm/gemini.py +++ b/packages/core/src/repowise/core/providers/llm/gemini.py @@ -36,9 +36,12 @@ RateLimitError, ) -from typing import Any, AsyncIterator +from typing import TYPE_CHECKING, Any, AsyncIterator from repowise.core.rate_limiter import RateLimiter +if TYPE_CHECKING: + from repowise.core.generation.cost_tracker import CostTracker + log = structlog.get_logger(__name__) _MAX_RETRIES = 3 @@ -53,6 +56,7 @@ class GeminiProvider(BaseProvider): model: Gemini model name. Defaults to gemini-3.1-flash-lite-preview. api_key: Google API key. Falls back to GEMINI_API_KEY or GOOGLE_API_KEY env var. rate_limiter: Optional RateLimiter instance. + cost_tracker: Optional CostTracker for recording token usage and cost. """ def __init__( @@ -60,6 +64,7 @@ def __init__( model: str = "gemini-3.1-flash-lite-preview", api_key: str | None = None, rate_limiter: RateLimiter | None = None, + cost_tracker: "CostTracker | None" = None, ) -> None: self._model = model self._api_key = ( @@ -73,6 +78,7 @@ def __init__( "No API key found. Pass api_key= or set GEMINI_API_KEY / GOOGLE_API_KEY env var.", ) self._rate_limiter = rate_limiter + self._cost_tracker = cost_tracker self._client: object | None = None # cached; created once on first call @property @@ -185,6 +191,21 @@ def _call_sync() -> GeneratedResponse: output_tokens=result.output_tokens, request_id=request_id, ) + + if self._cost_tracker is not None: + try: + asyncio.get_event_loop().create_task( + self._cost_tracker.record( + model=self._model, + input_tokens=result.input_tokens, + output_tokens=result.output_tokens, + operation="doc_generation", + file_path=None, + ) + ) + except RuntimeError: + pass # No running event loop — skip async record + return result # --- ChatProvider protocol implementation --- diff --git a/packages/core/src/repowise/core/providers/llm/litellm.py b/packages/core/src/repowise/core/providers/llm/litellm.py index 0692cd9..1273e22 100644 --- a/packages/core/src/repowise/core/providers/llm/litellm.py +++ b/packages/core/src/repowise/core/providers/llm/litellm.py @@ -37,9 +37,12 @@ RateLimitError, ) -from typing import Any, AsyncIterator +from typing import TYPE_CHECKING, Any, AsyncIterator from repowise.core.rate_limiter import RateLimiter +if TYPE_CHECKING: + from repowise.core.generation.cost_tracker import CostTracker + log = structlog.get_logger(__name__) _MAX_RETRIES = 3 @@ -64,11 +67,13 @@ def __init__( api_key: str | None = None, api_base: str | None = None, rate_limiter: RateLimiter | None = None, + cost_tracker: "CostTracker | None" = None, ) -> None: self._model = model self._api_key = api_key self._api_base = api_base self._rate_limiter = rate_limiter + self._cost_tracker = cost_tracker @property def provider_name(self) -> str: @@ -162,6 +167,23 @@ async def _generate_with_retry( output_tokens=result.output_tokens, request_id=request_id, ) + + if self._cost_tracker is not None: + import asyncio + + try: + asyncio.get_event_loop().create_task( + self._cost_tracker.record( + model=self._model, + input_tokens=result.input_tokens, + output_tokens=result.output_tokens, + operation="doc_generation", + file_path=None, + ) + ) + except RuntimeError: + pass # No running event loop — skip async record + return result # --- ChatProvider protocol implementation --- diff --git a/packages/core/src/repowise/core/providers/llm/openai.py b/packages/core/src/repowise/core/providers/llm/openai.py index b13248b..1cc3e7b 100644 --- a/packages/core/src/repowise/core/providers/llm/openai.py +++ b/packages/core/src/repowise/core/providers/llm/openai.py @@ -35,9 +35,12 @@ RateLimitError, ) -from typing import Any, AsyncIterator +from typing import TYPE_CHECKING, Any, AsyncIterator from repowise.core.rate_limiter import RateLimiter +if TYPE_CHECKING: + from repowise.core.generation.cost_tracker import CostTracker + log = structlog.get_logger(__name__) _MAX_RETRIES = 3 @@ -61,6 +64,7 @@ def __init__( model: str = "gpt-5.4-nano", base_url: str | None = None, rate_limiter: RateLimiter | None = None, + cost_tracker: "CostTracker | None" = None, ) -> None: resolved_key = api_key or os.environ.get("OPENAI_API_KEY") if not resolved_key: @@ -71,6 +75,7 @@ def __init__( self._client = AsyncOpenAI(api_key=resolved_key, base_url=base_url) self._model = model self._rate_limiter = rate_limiter + self._cost_tracker = cost_tracker @property def provider_name(self) -> str: @@ -161,6 +166,23 @@ async def _generate_with_retry( output_tokens=result.output_tokens, request_id=request_id, ) + + if self._cost_tracker is not None: + import asyncio + + try: + asyncio.get_event_loop().create_task( + self._cost_tracker.record( + model=self._model, + input_tokens=result.input_tokens, + output_tokens=result.output_tokens, + operation="doc_generation", + file_path=None, + ) + ) + except RuntimeError: + pass # No running event loop — skip async record + return result # --- ChatProvider protocol implementation --- diff --git a/packages/server/src/repowise/server/mcp_server/tool_dead_code.py b/packages/server/src/repowise/server/mcp_server/tool_dead_code.py index 8596233..364b104 100644 --- a/packages/server/src/repowise/server/mcp_server/tool_dead_code.py +++ b/packages/server/src/repowise/server/mcp_server/tool_dead_code.py @@ -28,6 +28,10 @@ async def get_dead_code( directory: str | None = None, owner: str | None = None, group_by: str | None = None, + include_internals: bool = False, + include_zombie_packages: bool = True, + no_unreachable: bool = False, + no_unused_exports: bool = False, ) -> dict: """Get a tiered refactor plan for dead and unused code. @@ -39,16 +43,39 @@ async def get_dead_code( group_by="owner" to see who owns the most dead code. Use tier to focus on a single confidence band. + Scan scope flags (mirror the DeadCodeAnalyzer.analyze config): + - Use ``min_confidence=0.7`` for high-confidence cleanups — filters out + speculative findings and surfaces only code with zero references that + hasn't been touched in months. Ideal before a release or refactor sprint. + - Use ``include_internals=True`` for aggressive scans of private symbols + (functions/variables prefixed with _ or declared without exports). This + has a higher false-positive rate and is off by default; enable it when + doing a thorough dead-code purge of a stable, well-tested module. + - Use ``no_unreachable=True`` to skip file-level reachability checks and + focus only on symbol-level findings (unused exports/internals). + - Use ``no_unused_exports=True`` to skip public-export checks, e.g. when + you know the repo exposes a public API consumed externally. + - Use ``include_zombie_packages=False`` to suppress monorepo package + findings, useful in repos where cross-package imports are intentionally + absent during development. + Args: repo: Repository path, name, or ID. kind: Filter by kind (unreachable_file, unused_export, unused_internal, zombie_package). - min_confidence: Minimum confidence threshold (default 0.5). + min_confidence: Minimum confidence threshold (default 0.5). Use 0.7 for high-confidence + cleanups only. safe_only: Only return findings marked safe_to_delete (default false). limit: Maximum findings per tier (default 20). tier: Focus on a single tier: "high" (>=0.8), "medium" (0.5-0.8), or "low" (<0.5). directory: Filter findings to a specific directory prefix. owner: Filter findings by primary owner name. group_by: "directory" for per-directory rollup, "owner" for ownership hotspots. + include_internals: Include unused private/internal symbol findings (default false). + Enable for aggressive scans of private symbols. + include_zombie_packages: Include zombie-package findings for monorepo packages with + no external importers (default true). + no_unreachable: Suppress unreachable-file findings (default false). + no_unused_exports: Suppress unused-export findings (default false). """ async with get_session(_state._session_factory) as session: repository = await _get_repo(session, repo) @@ -73,10 +100,23 @@ async def get_dead_code( ) git_meta_map = {g.file_path: g for g in git_res.scalars().all()} + # --- Build excluded kinds from scope flags --- + _excluded_kinds: set[str] = set() + if no_unreachable: + _excluded_kinds.add("unreachable_file") + if no_unused_exports: + _excluded_kinds.add("unused_export") + if not include_internals: + _excluded_kinds.add("unused_internal") + if not include_zombie_packages: + _excluded_kinds.add("zombie_package") + # --- Apply filters --- filtered = all_findings if kind: filtered = [f for f in filtered if f.kind == kind] + elif _excluded_kinds: + filtered = [f for f in filtered if f.kind not in _excluded_kinds] if safe_only: filtered = [f for f in filtered if f.safe_to_delete] if min_confidence > 0: diff --git a/packages/server/src/repowise/server/mcp_server/tool_overview.py b/packages/server/src/repowise/server/mcp_server/tool_overview.py index 75ad0d1..fb0063a 100644 --- a/packages/server/src/repowise/server/mcp_server/tool_overview.py +++ b/packages/server/src/repowise/server/mcp_server/tool_overview.py @@ -2,7 +2,7 @@ from __future__ import annotations -from collections import Counter +from collections import Counter, defaultdict from typing import Any from sqlalchemy import select @@ -117,6 +117,78 @@ async def get_overview(repo: str | None = None) -> dict: "top_churn_modules": top_modules, } + # B. Knowledge map ------------------------------------------------------- + knowledge_map: dict[str, Any] = {} + if all_git: + # top_owners: aggregate primary_owner_email across all files + owner_file_count: dict[str, int] = defaultdict(int) + owner_pct_sum: dict[str, float] = defaultdict(float) + for g in all_git: + email = g.primary_owner_email or "" + if email: + owner_file_count[email] += 1 + owner_pct_sum[email] += float(g.primary_owner_commit_pct or 0.0) + + total_files = len(all_git) or 1 + top_owners = sorted( + [ + { + "email": email, + "files_owned": count, + "percentage": round(count / total_files * 100.0, 1), + } + for email, count in owner_file_count.items() + ], + key=lambda x: -x["files_owned"], + )[:10] + + # knowledge_silos: files where primary owner has > 80% ownership + knowledge_silos = [ + g.file_path + for g in all_git + if (g.primary_owner_commit_pct or 0.0) > 0.8 + ] + + # onboarding_targets: high-centrality files with least docs + # pagerank from graph_nodes; doc length from wiki_pages + node_result = await session.execute( + select(GraphNode).where( + GraphNode.repository_id == repository.id, + GraphNode.is_test == False, # noqa: E712 + ) + ) + all_nodes = node_result.scalars().all() + + # Build word-count map from wiki_pages (file pages) + page_result = await session.execute( + select(Page).where( + Page.repository_id == repository.id, + Page.page_type == "file_page", + ) + ) + doc_words: dict[str, int] = { + p.target_path: len(p.content.split()) for p in page_result.scalars().all() + } + + onboarding_candidates = [ + { + "path": n.node_id, + "pagerank": n.pagerank, + "doc_words": doc_words.get(n.node_id, 0), + } + for n in all_nodes + if n.pagerank > 0.0 + ] + # Sort by fewest doc words first (least documented), then by highest pagerank + onboarding_candidates.sort(key=lambda x: (x["doc_words"], -x["pagerank"])) + onboarding_targets = [c["path"] for c in onboarding_candidates[:5]] + + knowledge_map = { + "top_owners": top_owners, + "knowledge_silos": knowledge_silos, + "onboarding_targets": onboarding_targets, + } + return { "title": overview_page.title if overview_page else repository.name, "content_md": overview_page.content if overview_page else "No overview generated yet.", @@ -135,4 +207,5 @@ async def get_overview(repo: str | None = None) -> dict: ], "entry_points": [n.node_id for n in entry_nodes], "git_health": git_health, + "knowledge_map": knowledge_map, } diff --git a/packages/server/src/repowise/server/mcp_server/tool_risk.py b/packages/server/src/repowise/server/mcp_server/tool_risk.py index f8cae89..7cad1a7 100644 --- a/packages/server/src/repowise/server/mcp_server/tool_risk.py +++ b/packages/server/src/repowise/server/mcp_server/tool_risk.py @@ -11,6 +11,8 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import text + from repowise.core.persistence.database import get_session from repowise.core.persistence.models import ( GitMetadata, @@ -127,6 +129,48 @@ def _compute_impact_surface( return ranked[:3] +async def _check_test_gap(session: AsyncSession, repo_id: str, target: str) -> bool: + """Return True if no test file corresponding to *target* exists in graph_nodes.""" + import os + + base = os.path.splitext(os.path.basename(target))[0] + ext = os.path.splitext(target)[1].lstrip(".") + # Build a LIKE pattern broad enough to catch test_, _test, .spec.* + patterns = [f"%test_{base}%", f"%{base}_test%", f"%{base}.spec.{ext}%"] + for pat in patterns: + row = await session.execute( + select(GraphNode).where( + GraphNode.repository_id == repo_id, + GraphNode.is_test == True, # noqa: E712 + GraphNode.node_id.like(pat), + ).limit(1) + ) + if row.scalar_one_or_none() is not None: + return False + return True + + +async def _get_security_signals( + session: AsyncSession, repo_id: str, target: str +) -> list[dict]: + """Fetch stored security findings for *target* from security_findings table.""" + try: + rows = await session.execute( + text( + "SELECT kind, severity, snippet FROM security_findings " + "WHERE repository_id = :repo_id AND file_path = :fp " + "ORDER BY severity DESC, kind" + ), + {"repo_id": repo_id, "fp": target}, + ) + return [ + {"kind": r[0], "severity": r[1], "snippet": r[2]} + for r in rows.all() + ] + except Exception: # noqa: BLE001 — table may not exist pre-migration + return [] + + async def _assess_one_target( session: AsyncSession, repository: Repository, @@ -136,7 +180,12 @@ async def _assess_one_target( reverse_deps: dict[str, set[str]], node_meta: dict[str, Any], ) -> dict: - """Assess risk for a single target file.""" + """Assess risk for a single target file. + + Enriches each result with: + - test_gap: bool — True when no test file matching this file's basename exists. + - security_signals: list of {kind, severity, snippet} from security_findings. + """ repo_id = repository.id result_data: dict[str, Any] = {"target": target} @@ -164,6 +213,8 @@ async def _assess_one_target( reverse_deps, node_meta, ) + result_data["test_gap"] = await _check_test_gap(session, repo_id, target) + result_data["security_signals"] = await _get_security_signals(session, repo_id, target) result_data["risk_summary"] = f"{target} — no git metadata available" return result_data @@ -240,6 +291,10 @@ async def _assess_one_target( if merge_commit_count > 0: result_data["merge_commit_count_90d"] = merge_commit_count + # C. Test gaps + security signals + result_data["test_gap"] = await _check_test_gap(session, repo_id, target) + result_data["security_signals"] = await _get_security_signals(session, repo_id, target) + capped = getattr(meta, "commit_count_capped", False) capped_note = " (history truncated — actual count may be higher)" if capped else "" result_data["commit_count_capped"] = capped @@ -262,6 +317,7 @@ async def _assess_one_target( async def get_risk( targets: list[str], repo: str | None = None, + changed_files: list[str] | None = None, ) -> dict: """Assess modification risk for one or more files before making changes. @@ -270,14 +326,26 @@ async def get_risk( - risk_type ("churn-heavy"/"bug-prone"/"high-coupling"/"stable") - impact_surface: top 3 critical modules that would break - dependents, co-change partners, ownership + - test_gap: bool — True if no test file exists for this file + - security_signals: list of {kind, severity, snippet} from static analysis Plus the top 5 global hotspots for ambient awareness. - Example: get_risk(["src/auth/service.py", "src/auth/middleware.py"]) + Pass ``changed_files`` for PR review / blast radius analysis. When provided, + the response includes an additional ``pr_blast_radius`` key containing: + - direct_risks: per-file risk score (centrality × temporal hotspot) + - transitive_affected: files that import any changed file (up to depth 3) + - cochange_warnings: historical co-change partners missing from the PR + - recommended_reviewers: top 5 owners of affected files + - test_gaps: changed/affected files lacking a corresponding test + - overall_risk_score: 0-10 composite score + + Example: get_risk(["src/auth/service.py"], changed_files=["src/auth/service.py"]) Args: - targets: List of file paths to assess. + targets: List of file paths to assess (standard per-file risk). repo: Repository path, name, or ID. + changed_files: Optional list of files changed in a PR for blast-radius analysis. """ async with get_session(_state._session_factory) as session: repository = await _get_repo(session, repo) @@ -343,7 +411,18 @@ async def get_risk( if h.file_path not in target_set ][:5] - return { + # A. PR blast radius (only when caller passes changed_files) + pr_blast_radius: dict | None = None + if changed_files: + from repowise.core.analysis.pr_blast import PRBlastRadiusAnalyzer + + analyzer = PRBlastRadiusAnalyzer(session, repo_id) + pr_blast_radius = await analyzer.analyze_files(changed_files) + + response: dict = { "targets": {r["target"]: r for r in results}, "global_hotspots": global_hotspots, } + if pr_blast_radius is not None: + response["pr_blast_radius"] = pr_blast_radius + return response diff --git a/tests/unit/persistence/test_models.py b/tests/unit/persistence/test_models.py index b9333c0..d4f9f81 100644 --- a/tests/unit/persistence/test_models.py +++ b/tests/unit/persistence/test_models.py @@ -304,5 +304,7 @@ def test_base_includes_all_models(): "decision_records", "conversations", "chat_messages", + "llm_costs", + "security_findings", } assert expected == table_names From b51cc0083d7e7bb343227868f26d04776717b1d2 Mon Sep 17 00:00:00 2001 From: RaghavChamadiya Date: Tue, 7 Apr 2026 16:58:10 +0530 Subject: [PATCH 2/4] feat(web): surface pipeline-overhaul capabilities in web UI P0: - Cost tracking: GET /api/repos/{id}/costs[/summary], /repos/[id]/costs page with day/model/operation grouping, Recharts chart, sidebar nav. - Hotspot table: temporal_hotspot_score in HotspotResponse, ordered by trend, new sortable Trend column with flame indicator. - Test gap: test_gap on GitMetadataResponse, "No tests" badge in wiki sidebar. - Security findings: GET /api/repos/{id}/security router, SecurityPanel client component on wiki page right sidebar. P1: - PR blast radius: POST /api/repos/{id}/blast-radius, interactive page with risk score gauge, direct/transitive/co-change/reviewer/test-gap tables, sidebar + mobile nav entry. - Knowledge map: extracted compute_knowledge_map() service, REST endpoint, 3-column overview card (top owners / silos / onboarding targets). - Coordinator drift: GET /api/repos/{id}/health/coordinator, System Health card on settings page with color-coded status. - Live cost on web progress: SSE stream now sums llm_costs since job start, generation-progress component shows running cost with live indicator. --- packages/server/src/repowise/server/app.py | 8 + .../server/mcp_server/tool_overview.py | 79 +--- .../repowise/server/routers/blast_radius.py | 36 ++ .../src/repowise/server/routers/costs.py | 111 ++++++ .../server/src/repowise/server/routers/git.py | 11 +- .../src/repowise/server/routers/health.py | 54 ++- .../src/repowise/server/routers/jobs.py | 15 +- .../repowise/server/routers/knowledge_map.py | 46 +++ .../src/repowise/server/routers/security.py | 52 +++ .../server/src/repowise/server/schemas.py | 118 ++++++ .../src/repowise/server/services/__init__.py | 1 + .../repowise/server/services/knowledge_map.py | 102 +++++ .../src/app/repos/[id]/blast-radius/page.tsx | 368 ++++++++++++++++++ .../web/src/app/repos/[id]/costs/page.tsx | 236 +++++++++++ .../web/src/app/repos/[id]/overview/page.tsx | 112 +++++- .../web/src/app/repos/[id]/settings/page.tsx | 14 + .../app/repos/[id]/wiki/[...slug]/page.tsx | 11 + .../web/src/components/git/hotspot-table.tsx | 73 +++- .../components/jobs/generation-progress.tsx | 19 +- .../web/src/components/layout/mobile-nav.tsx | 2 + .../web/src/components/layout/sidebar.tsx | 4 + .../repos/coordinator-health-panel.tsx | 86 ++++ .../src/components/wiki/git-history-panel.tsx | 3 +- .../src/components/wiki/security-panel.tsx | 63 +++ packages/web/src/lib/api/blast-radius.ts | 57 +++ packages/web/src/lib/api/costs.ts | 44 +++ packages/web/src/lib/api/health.ts | 12 + packages/web/src/lib/api/knowledge-map.ts | 38 ++ packages/web/src/lib/api/security.ts | 25 ++ packages/web/src/lib/api/types.ts | 3 + 30 files changed, 1719 insertions(+), 84 deletions(-) create mode 100644 packages/server/src/repowise/server/routers/blast_radius.py create mode 100644 packages/server/src/repowise/server/routers/costs.py create mode 100644 packages/server/src/repowise/server/routers/knowledge_map.py create mode 100644 packages/server/src/repowise/server/routers/security.py create mode 100644 packages/server/src/repowise/server/services/__init__.py create mode 100644 packages/server/src/repowise/server/services/knowledge_map.py create mode 100644 packages/web/src/app/repos/[id]/blast-radius/page.tsx create mode 100644 packages/web/src/app/repos/[id]/costs/page.tsx create mode 100644 packages/web/src/components/repos/coordinator-health-panel.tsx create mode 100644 packages/web/src/components/wiki/security-panel.tsx create mode 100644 packages/web/src/lib/api/blast-radius.ts create mode 100644 packages/web/src/lib/api/costs.ts create mode 100644 packages/web/src/lib/api/knowledge-map.ts create mode 100644 packages/web/src/lib/api/security.ts diff --git a/packages/server/src/repowise/server/app.py b/packages/server/src/repowise/server/app.py index ddefcda..14ef79f 100644 --- a/packages/server/src/repowise/server/app.py +++ b/packages/server/src/repowise/server/app.py @@ -28,18 +28,22 @@ from repowise.core.providers.embedding.base import MockEmbedder from repowise.server import __version__ from repowise.server.routers import ( + blast_radius, chat, claude_md, + costs, dead_code, decisions, git, graph, health, jobs, + knowledge_map, pages, providers, repos, search, + security, symbols, webhooks, ) @@ -187,5 +191,9 @@ async def bad_request_handler(request: Request, exc: ValueError) -> JSONResponse app.include_router(decisions.router) app.include_router(chat.router) app.include_router(providers.router) + app.include_router(costs.router) + app.include_router(security.router) + app.include_router(blast_radius.router) + app.include_router(knowledge_map.router) return app diff --git a/packages/server/src/repowise/server/mcp_server/tool_overview.py b/packages/server/src/repowise/server/mcp_server/tool_overview.py index fb0063a..56198d9 100644 --- a/packages/server/src/repowise/server/mcp_server/tool_overview.py +++ b/packages/server/src/repowise/server/mcp_server/tool_overview.py @@ -2,7 +2,7 @@ from __future__ import annotations -from collections import Counter, defaultdict +from collections import Counter from typing import Any from sqlalchemy import select @@ -16,6 +16,7 @@ from repowise.server.mcp_server import _state from repowise.server.mcp_server._helpers import _get_repo from repowise.server.mcp_server._server import mcp +from repowise.server.services.knowledge_map import compute_knowledge_map @mcp.tool() @@ -118,76 +119,16 @@ async def get_overview(repo: str | None = None) -> dict: } # B. Knowledge map ------------------------------------------------------- - knowledge_map: dict[str, Any] = {} - if all_git: - # top_owners: aggregate primary_owner_email across all files - owner_file_count: dict[str, int] = defaultdict(int) - owner_pct_sum: dict[str, float] = defaultdict(float) - for g in all_git: - email = g.primary_owner_email or "" - if email: - owner_file_count[email] += 1 - owner_pct_sum[email] += float(g.primary_owner_commit_pct or 0.0) - - total_files = len(all_git) or 1 - top_owners = sorted( - [ - { - "email": email, - "files_owned": count, - "percentage": round(count / total_files * 100.0, 1), - } - for email, count in owner_file_count.items() - ], - key=lambda x: -x["files_owned"], - )[:10] - - # knowledge_silos: files where primary owner has > 80% ownership - knowledge_silos = [ - g.file_path - for g in all_git - if (g.primary_owner_commit_pct or 0.0) > 0.8 + knowledge_map = await compute_knowledge_map(session, repository.id) + # Flatten onboarding_targets to a list of paths (MCP tool backward compat) + if knowledge_map and "onboarding_targets" in knowledge_map: + knowledge_map = dict(knowledge_map) + knowledge_map["onboarding_targets"] = [ + t["path"] for t in knowledge_map["onboarding_targets"] ] - - # onboarding_targets: high-centrality files with least docs - # pagerank from graph_nodes; doc length from wiki_pages - node_result = await session.execute( - select(GraphNode).where( - GraphNode.repository_id == repository.id, - GraphNode.is_test == False, # noqa: E712 - ) - ) - all_nodes = node_result.scalars().all() - - # Build word-count map from wiki_pages (file pages) - page_result = await session.execute( - select(Page).where( - Page.repository_id == repository.id, - Page.page_type == "file_page", - ) - ) - doc_words: dict[str, int] = { - p.target_path: len(p.content.split()) for p in page_result.scalars().all() - } - - onboarding_candidates = [ - { - "path": n.node_id, - "pagerank": n.pagerank, - "doc_words": doc_words.get(n.node_id, 0), - } - for n in all_nodes - if n.pagerank > 0.0 + knowledge_map["knowledge_silos"] = [ + s["file_path"] for s in knowledge_map["knowledge_silos"] ] - # Sort by fewest doc words first (least documented), then by highest pagerank - onboarding_candidates.sort(key=lambda x: (x["doc_words"], -x["pagerank"])) - onboarding_targets = [c["path"] for c in onboarding_candidates[:5]] - - knowledge_map = { - "top_owners": top_owners, - "knowledge_silos": knowledge_silos, - "onboarding_targets": onboarding_targets, - } return { "title": overview_page.title if overview_page else repository.name, diff --git a/packages/server/src/repowise/server/routers/blast_radius.py b/packages/server/src/repowise/server/routers/blast_radius.py new file mode 100644 index 0000000..f9066a2 --- /dev/null +++ b/packages/server/src/repowise/server/routers/blast_radius.py @@ -0,0 +1,36 @@ +"""/api/repos/{repo_id}/blast-radius — PR blast radius analysis endpoint.""" + +from __future__ import annotations + +from sqlalchemy.ext.asyncio import AsyncSession + +from fastapi import APIRouter, Depends +from repowise.core.analysis.pr_blast import PRBlastRadiusAnalyzer +from repowise.server.deps import get_db_session, verify_api_key +from repowise.server.schemas import BlastRadiusRequest, BlastRadiusResponse + +router = APIRouter( + prefix="/api/repos", + tags=["blast-radius"], + dependencies=[Depends(verify_api_key)], +) + + +@router.post("/{repo_id}/blast-radius", response_model=BlastRadiusResponse) +async def analyze_blast_radius( + repo_id: str, + body: BlastRadiusRequest, + session: AsyncSession = Depends(get_db_session), # noqa: B008 +) -> BlastRadiusResponse: + """Compute blast radius for a proposed PR given its changed files. + + Returns direct risk scores per file, transitively affected files (BFS up to + max_depth), historical co-change warnings, recommended reviewers, test gaps, + and an overall risk score (0–10). + """ + analyzer = PRBlastRadiusAnalyzer(session=session, repo_id=repo_id) + result = await analyzer.analyze_files( + changed_files=body.changed_files, + max_depth=body.max_depth, + ) + return BlastRadiusResponse(**result) diff --git a/packages/server/src/repowise/server/routers/costs.py b/packages/server/src/repowise/server/routers/costs.py new file mode 100644 index 0000000..92f8ca1 --- /dev/null +++ b/packages/server/src/repowise/server/routers/costs.py @@ -0,0 +1,111 @@ +"""/api/repos/{repo_id}/costs — LLM cost tracking endpoints.""" + +from __future__ import annotations + +from datetime import datetime, date + +import sqlalchemy as sa +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from fastapi import APIRouter, Depends, Query +from repowise.core.persistence.models import LlmCost +from repowise.server.deps import get_db_session, verify_api_key +from repowise.server.schemas import CostGroupResponse, CostSummaryResponse + +router = APIRouter( + prefix="/api/repos", + tags=["costs"], + dependencies=[Depends(verify_api_key)], +) + + +def _parse_since(since: str | None) -> datetime | None: + """Parse an ISO date string (YYYY-MM-DD) into a datetime, or return None.""" + if since is None: + return None + try: + return datetime.fromisoformat(since) + except ValueError: + # Try date-only format + return datetime.combine(date.fromisoformat(since), datetime.min.time()) + + +@router.get("/{repo_id}/costs/summary", response_model=CostSummaryResponse) +async def get_cost_summary( + repo_id: str, + since: str | None = Query(None, description="ISO date filter, e.g. 2025-01-01"), + session: AsyncSession = Depends(get_db_session), # noqa: B008 +) -> CostSummaryResponse: + """Return aggregate cost totals for a repository.""" + since_dt = _parse_since(since) + + stmt = sa.select( + sa.func.count().label("calls"), + sa.func.sum(LlmCost.input_tokens).label("input_tokens"), + sa.func.sum(LlmCost.output_tokens).label("output_tokens"), + sa.func.sum(LlmCost.cost_usd).label("cost_usd"), + ).where(LlmCost.repository_id == repo_id) + + if since_dt is not None: + stmt = stmt.where(LlmCost.ts >= since_dt) + + result = await session.execute(stmt) + row = result.one() + + return CostSummaryResponse( + total_cost_usd=row.cost_usd or 0.0, + total_calls=row.calls or 0, + total_input_tokens=row.input_tokens or 0, + total_output_tokens=row.output_tokens or 0, + since=since, + ) + + +@router.get("/{repo_id}/costs", response_model=list[CostGroupResponse]) +async def list_costs( + repo_id: str, + since: str | None = Query(None, description="ISO date filter, e.g. 2025-01-01"), + by: str = Query("day", description="Grouping dimension: operation | model | day"), + session: AsyncSession = Depends(get_db_session), # noqa: B008 +) -> list[CostGroupResponse]: + """Return grouped cost totals for a repository.""" + since_dt = _parse_since(since) + + if by == "model": + group_col = LlmCost.model + elif by == "day": + group_col = sa.func.strftime("%Y-%m-%d", LlmCost.ts) + else: + # Default: operation + group_col = LlmCost.operation + + stmt = ( + sa.select( + group_col.label("group"), + sa.func.count().label("calls"), + sa.func.sum(LlmCost.input_tokens).label("input_tokens"), + sa.func.sum(LlmCost.output_tokens).label("output_tokens"), + sa.func.sum(LlmCost.cost_usd).label("cost_usd"), + ) + .where(LlmCost.repository_id == repo_id) + .group_by(group_col) + .order_by(sa.func.sum(LlmCost.cost_usd).desc()) + ) + + if since_dt is not None: + stmt = stmt.where(LlmCost.ts >= since_dt) + + result = await session.execute(stmt) + rows = result.fetchall() + + return [ + CostGroupResponse( + group=row.group or "(unknown)", + calls=row.calls or 0, + input_tokens=row.input_tokens or 0, + output_tokens=row.output_tokens or 0, + cost_usd=row.cost_usd or 0.0, + ) + for row in rows + ] diff --git a/packages/server/src/repowise/server/routers/git.py b/packages/server/src/repowise/server/routers/git.py index 57e80fe..1d3cdd4 100644 --- a/packages/server/src/repowise/server/routers/git.py +++ b/packages/server/src/repowise/server/routers/git.py @@ -11,6 +11,7 @@ from repowise.core.persistence import crud from repowise.core.persistence.models import GitMetadata from repowise.server.deps import get_db_session, verify_api_key +from repowise.server.mcp_server.tool_risk import _check_test_gap from repowise.server.schemas import ( GitMetadataResponse, GitSummaryResponse, @@ -35,7 +36,9 @@ async def get_git_metadata( meta = await crud.get_git_metadata(session, repo_id, file_path) if meta is None: raise HTTPException(status_code=404, detail="Git metadata not found") - return GitMetadataResponse.from_orm(meta) + response = GitMetadataResponse.from_orm(meta) + response.test_gap = await _check_test_gap(session, repo_id, file_path) + return response @router.get("/{repo_id}/hotspots", response_model=list[HotspotResponse]) @@ -51,7 +54,10 @@ async def get_hotspots( GitMetadata.repository_id == repo_id, GitMetadata.is_hotspot.is_(True), ) - .order_by(GitMetadata.churn_percentile.desc()) + .order_by( + GitMetadata.temporal_hotspot_score.desc().nulls_last(), + GitMetadata.churn_percentile.desc(), + ) .limit(limit) ) rows = result.scalars().all() @@ -61,6 +67,7 @@ async def get_hotspots( commit_count_90d=r.commit_count_90d, commit_count_30d=r.commit_count_30d, churn_percentile=r.churn_percentile, + temporal_hotspot_score=r.temporal_hotspot_score, primary_owner=r.primary_owner_name, is_hotspot=r.is_hotspot, is_stable=r.is_stable, diff --git a/packages/server/src/repowise/server/routers/health.py b/packages/server/src/repowise/server/routers/health.py index 84afa33..caca4a1 100644 --- a/packages/server/src/repowise/server/routers/health.py +++ b/packages/server/src/repowise/server/routers/health.py @@ -6,13 +6,16 @@ from __future__ import annotations from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.sql import text -from fastapi import APIRouter, Request +from fastapi import APIRouter, Depends, Request +from repowise.core.persistence.coordinator import AtomicStorageCoordinator from repowise.core.persistence.database import get_session from repowise.core.persistence.models import GenerationJob, Page from repowise.server import __version__ -from repowise.server.schemas import HealthResponse +from repowise.server.deps import get_db_session, get_vector_store +from repowise.server.schemas import CoordinatorHealthResponse, HealthResponse router = APIRouter(tags=["health"]) @@ -74,3 +77,50 @@ async def metrics(request: Request) -> str: from starlette.responses import Response return Response(content="\n".join(lines) + "\n", media_type="text/plain") + + +_repo_health_router = APIRouter(prefix="/api/repos", tags=["health"]) + + +@_repo_health_router.get("/{repo_id}/health/coordinator", response_model=CoordinatorHealthResponse) +async def coordinator_health( + repo_id: str, + session: AsyncSession = Depends(get_db_session), # noqa: B008 + vector_store=Depends(get_vector_store), # noqa: B008 +) -> CoordinatorHealthResponse: + """Return coordinator drift health for a repository.""" + coord = AtomicStorageCoordinator(session, graph_builder=None, vector_store=vector_store) + result = await coord.health_check() + + sql_pages: int | None = result.get("sql_pages") + vector_count: int | None = result.get("vector_count") + graph_nodes: int | None = result.get("graph_nodes") + drift: float | None = result.get("drift") + + # Normalise vector_count: -1 means unsupported (return None) + if vector_count == -1: + vector_count = None + + # Drift percentage (0–100) + drift_pct: float | None = round(drift * 100, 2) if drift is not None else None + + if drift_pct is None: + status = "ok" + elif drift_pct <= 1.0: + status = "ok" + elif drift_pct <= 5.0: + status = "warning" + else: + status = "critical" + + return CoordinatorHealthResponse( + sql_pages=sql_pages, + vector_count=vector_count, + graph_nodes=graph_nodes, + drift_pct=drift_pct, + status=status, + ) + + +# Merge repo-scoped routes into the main router so they are registered together. +router.include_router(_repo_health_router) diff --git a/packages/server/src/repowise/server/routers/jobs.py b/packages/server/src/repowise/server/routers/jobs.py index b8edfbf..cf57763 100644 --- a/packages/server/src/repowise/server/routers/jobs.py +++ b/packages/server/src/repowise/server/routers/jobs.py @@ -5,14 +5,14 @@ import asyncio import json -from sqlalchemy import select +from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from starlette.responses import StreamingResponse from fastapi import APIRouter, Depends, HTTPException, Query, Request from repowise.core.persistence import crud from repowise.core.persistence.database import get_session -from repowise.core.persistence.models import GenerationJob +from repowise.core.persistence.models import GenerationJob, LlmCost from repowise.server.deps import get_db_session, verify_api_key from repowise.server.schemas import JobResponse @@ -79,6 +79,16 @@ async def event_generator(): yield f"event: error\ndata: {data}\n\n" return + # Sum LLM costs recorded since the job started + actual_cost_usd: float | None = None + if job.started_at is not None: + cost_q = select(func.sum(LlmCost.cost_usd)).where( + LlmCost.repository_id == job.repository_id, + LlmCost.ts >= job.started_at, + ) + async with get_session(factory) as cost_session: + actual_cost_usd = await cost_session.scalar(cost_q) + progress = { "job_id": job.id, "status": job.status, @@ -86,6 +96,7 @@ async def event_generator(): "total_pages": job.total_pages, "failed_pages": job.failed_pages, "current_level": job.current_level, + "actual_cost_usd": actual_cost_usd, } data = json.dumps(progress) yield f"event: progress\ndata: {data}\n\n" diff --git a/packages/server/src/repowise/server/routers/knowledge_map.py b/packages/server/src/repowise/server/routers/knowledge_map.py new file mode 100644 index 0000000..6931f63 --- /dev/null +++ b/packages/server/src/repowise/server/routers/knowledge_map.py @@ -0,0 +1,46 @@ +"""/api/repos/{repo_id}/knowledge-map — Knowledge map endpoint.""" + +from __future__ import annotations + +from sqlalchemy.ext.asyncio import AsyncSession + +from fastapi import APIRouter, Depends, HTTPException +from repowise.server.deps import get_db_session, verify_api_key +from repowise.server.schemas import ( + KnowledgeMapOwner, + KnowledgeMapResponse, + KnowledgeMapSilo, + KnowledgeMapTarget, +) +from repowise.server.services.knowledge_map import compute_knowledge_map + +router = APIRouter( + prefix="/api/repos", + tags=["knowledge-map"], + dependencies=[Depends(verify_api_key)], +) + + +@router.get("/{repo_id}/knowledge-map", response_model=KnowledgeMapResponse) +async def get_knowledge_map( + repo_id: str, + session: AsyncSession = Depends(get_db_session), # noqa: B008 +) -> KnowledgeMapResponse: + """Return knowledge-map data for a repository. + + Includes top owners by file count, knowledge silos (files with >80 % + single-owner concentration), and onboarding targets (high-centrality + files with least documentation). + """ + data = await compute_knowledge_map(session, repo_id) + if not data: + raise HTTPException( + status_code=404, + detail="No git metadata found for this repository. Run an index first.", + ) + + return KnowledgeMapResponse( + top_owners=[KnowledgeMapOwner(**o) for o in data["top_owners"]], + knowledge_silos=[KnowledgeMapSilo(**s) for s in data["knowledge_silos"]], + onboarding_targets=[KnowledgeMapTarget(**t) for t in data["onboarding_targets"]], + ) diff --git a/packages/server/src/repowise/server/routers/security.py b/packages/server/src/repowise/server/routers/security.py new file mode 100644 index 0000000..cdd76a9 --- /dev/null +++ b/packages/server/src/repowise/server/routers/security.py @@ -0,0 +1,52 @@ +"""/api/repos/{repo_id}/security — Security findings endpoints.""" + +from __future__ import annotations + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from fastapi import APIRouter, Depends, Query +from repowise.core.persistence.models import SecurityFinding +from repowise.server.deps import get_db_session, verify_api_key +from repowise.server.schemas import SecurityFindingResponse + +router = APIRouter( + prefix="/api/repos", + tags=["security"], + dependencies=[Depends(verify_api_key)], +) + + +@router.get("/{repo_id}/security", response_model=list[SecurityFindingResponse]) +async def list_security_findings( + repo_id: str, + file_path: str | None = Query(None, description="Filter by relative file path"), + severity: str | None = Query(None, description="Filter by severity: high, med, or low"), + limit: int = Query(100, ge=1, le=500), + session: AsyncSession = Depends(get_db_session), # noqa: B008 +) -> list[SecurityFindingResponse]: + """List security findings for a repository, with optional filters.""" + stmt = select(SecurityFinding).where(SecurityFinding.repository_id == repo_id) + + if file_path is not None: + stmt = stmt.where(SecurityFinding.file_path == file_path) + + if severity is not None: + stmt = stmt.where(SecurityFinding.severity == severity) + + stmt = stmt.order_by(SecurityFinding.detected_at.desc()).limit(limit) + + result = await session.execute(stmt) + rows = result.scalars().all() + + return [ + SecurityFindingResponse( + id=row.id, + file_path=row.file_path, + kind=row.kind, + severity=row.severity, + snippet=row.snippet, + detected_at=row.detected_at, + ) + for row in rows + ] diff --git a/packages/server/src/repowise/server/schemas.py b/packages/server/src/repowise/server/schemas.py index bac0bfb..26451af 100644 --- a/packages/server/src/repowise/server/schemas.py +++ b/packages/server/src/repowise/server/schemas.py @@ -317,6 +317,7 @@ class GitMetadataResponse(BaseModel): avg_commit_size: float commit_categories: dict merge_commit_count_90d: int + test_gap: bool | None = None @classmethod def from_orm(cls, obj: object) -> GitMetadataResponse: @@ -356,6 +357,7 @@ class HotspotResponse(BaseModel): commit_count_90d: int commit_count_30d: int churn_percentile: float + temporal_hotspot_score: float | None = None primary_owner: str | None is_hotspot: bool is_stable: bool @@ -433,6 +435,20 @@ class DeadCodeSummaryResponse(BaseModel): by_kind: dict +# --------------------------------------------------------------------------- +# Security +# --------------------------------------------------------------------------- + + +class SecurityFindingResponse(BaseModel): + id: int + file_path: str + kind: str + severity: str + snippet: str | None + detected_at: datetime + + # --------------------------------------------------------------------------- # Repo Stats # --------------------------------------------------------------------------- @@ -539,6 +555,49 @@ class HotFilesGraphResponse(BaseModel): links: list[GraphEdgeResponse] +# --------------------------------------------------------------------------- +# Blast Radius +# --------------------------------------------------------------------------- + + +class BlastRadiusRequest(BaseModel): + changed_files: list[str] + max_depth: int = Field(default=3, ge=1, le=10) + + +class DirectRiskEntry(BaseModel): + path: str + risk_score: float + temporal_hotspot: float + centrality: float + + +class TransitiveEntry(BaseModel): + path: str + depth: int + + +class CochangeWarning(BaseModel): + changed: str + missing_partner: str + score: float + + +class ReviewerEntry(BaseModel): + email: str + files: int + ownership_pct: float + + +class BlastRadiusResponse(BaseModel): + direct_risks: list[DirectRiskEntry] + transitive_affected: list[TransitiveEntry] + cochange_warnings: list[CochangeWarning] + recommended_reviewers: list[ReviewerEntry] + test_gaps: list[str] + overall_risk_score: float + + # --------------------------------------------------------------------------- # Health # --------------------------------------------------------------------------- @@ -550,6 +609,44 @@ class HealthResponse(BaseModel): version: str +class CoordinatorHealthResponse(BaseModel): + sql_pages: int | None + vector_count: int | None + graph_nodes: int | None + drift_pct: float | None + status: str # "ok" | "warning" | "critical" + + +# --------------------------------------------------------------------------- +# Knowledge Map +# --------------------------------------------------------------------------- + + +class KnowledgeMapOwner(BaseModel): + email: str + name: str + files_owned: int + percentage: float + + +class KnowledgeMapSilo(BaseModel): + file_path: str + owner_email: str + owner_pct: float + + +class KnowledgeMapTarget(BaseModel): + path: str + pagerank: float + doc_words: int + + +class KnowledgeMapResponse(BaseModel): + top_owners: list[KnowledgeMapOwner] + knowledge_silos: list[KnowledgeMapSilo] + onboarding_targets: list[KnowledgeMapTarget] + + # --------------------------------------------------------------------------- # Decisions # --------------------------------------------------------------------------- @@ -691,3 +788,24 @@ class SetActiveProviderRequest(BaseModel): class SetApiKeyRequest(BaseModel): api_key: str + + +# --------------------------------------------------------------------------- +# Cost Tracking +# --------------------------------------------------------------------------- + + +class CostGroupResponse(BaseModel): + group: str + calls: int + input_tokens: int + output_tokens: int + cost_usd: float + + +class CostSummaryResponse(BaseModel): + total_cost_usd: float + total_calls: int + total_input_tokens: int + total_output_tokens: int + since: str | None diff --git a/packages/server/src/repowise/server/services/__init__.py b/packages/server/src/repowise/server/services/__init__.py new file mode 100644 index 0000000..d99cd3e --- /dev/null +++ b/packages/server/src/repowise/server/services/__init__.py @@ -0,0 +1 @@ +"""repowise server services.""" diff --git a/packages/server/src/repowise/server/services/knowledge_map.py b/packages/server/src/repowise/server/services/knowledge_map.py new file mode 100644 index 0000000..6bb8c83 --- /dev/null +++ b/packages/server/src/repowise/server/services/knowledge_map.py @@ -0,0 +1,102 @@ +"""Shared logic for computing the knowledge map for a repository.""" + +from __future__ import annotations + +from collections import defaultdict +from typing import Any + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from repowise.core.persistence.models import GitMetadata, GraphNode, Page + + +async def compute_knowledge_map(session: AsyncSession, repo_id: str) -> dict[str, Any]: + """Return knowledge-map data for *repo_id*. + + Returns a dict with keys: + top_owners — list of {email, name, files_owned, percentage} + knowledge_silos — list of {file_path, owner_email, owner_pct} + onboarding_targets — list of {path, pagerank, doc_words} + + Returns an empty dict when no git metadata is available. + """ + git_res = await session.execute( + select(GitMetadata).where(GitMetadata.repository_id == repo_id) + ) + all_git = git_res.scalars().all() + + if not all_git: + return {} + + # top_owners: aggregate primary_owner_email across all files + owner_file_count: dict[str, int] = defaultdict(int) + owner_name_map: dict[str, str] = {} + for g in all_git: + email = g.primary_owner_email or "" + if email: + owner_file_count[email] += 1 + if g.primary_owner_name: + owner_name_map[email] = g.primary_owner_name + + total_files = len(all_git) or 1 + top_owners = sorted( + [ + { + "email": email, + "name": owner_name_map.get(email, ""), + "files_owned": count, + "percentage": round(count / total_files * 100.0, 1), + } + for email, count in owner_file_count.items() + ], + key=lambda x: -x["files_owned"], + )[:10] + + # knowledge_silos: files where primary owner has > 80% ownership + knowledge_silos = [ + { + "file_path": g.file_path, + "owner_email": g.primary_owner_email or "", + "owner_pct": round(float(g.primary_owner_commit_pct or 0.0), 3), + } + for g in all_git + if (g.primary_owner_commit_pct or 0.0) > 0.8 + ] + + # onboarding_targets: high-centrality files with fewest docs + node_result = await session.execute( + select(GraphNode).where( + GraphNode.repository_id == repo_id, + GraphNode.is_test == False, # noqa: E712 + ) + ) + all_nodes = node_result.scalars().all() + + page_result = await session.execute( + select(Page).where( + Page.repository_id == repo_id, + Page.page_type == "file_page", + ) + ) + doc_words: dict[str, int] = { + p.target_path: len(p.content.split()) for p in page_result.scalars().all() + } + + candidates = [ + { + "path": n.node_id, + "pagerank": n.pagerank, + "doc_words": doc_words.get(n.node_id, 0), + } + for n in all_nodes + if n.pagerank > 0.0 + ] + candidates.sort(key=lambda x: (x["doc_words"], -x["pagerank"])) + onboarding_targets = candidates[:10] + + return { + "top_owners": top_owners, + "knowledge_silos": knowledge_silos, + "onboarding_targets": onboarding_targets, + } diff --git a/packages/web/src/app/repos/[id]/blast-radius/page.tsx b/packages/web/src/app/repos/[id]/blast-radius/page.tsx new file mode 100644 index 0000000..d3c1545 --- /dev/null +++ b/packages/web/src/app/repos/[id]/blast-radius/page.tsx @@ -0,0 +1,368 @@ +"use client"; + +import { useState } from "react"; +import { useParams } from "next/navigation"; +import { Radar } from "lucide-react"; +import { Button } from "@/components/ui/button"; +import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; +import { cn } from "@/lib/utils/cn"; +import { + analyzeBlastRadius, + type BlastRadiusResponse, + type DirectRiskEntry, + type TransitiveEntry, + type CochangeWarning, + type ReviewerEntry, +} from "@/lib/api/blast-radius"; + +// --------------------------------------------------------------------------- +// Risk score gauge card +// --------------------------------------------------------------------------- + +function RiskScoreCard({ score }: { score: number }) { + const color = + score >= 7 + ? "text-red-500 border-red-500/30 bg-red-500/5" + : score >= 4 + ? "text-amber-500 border-amber-500/30 bg-amber-500/5" + : "text-emerald-500 border-emerald-500/30 bg-emerald-500/5"; + const label = score >= 7 ? "High Risk" : score >= 4 ? "Medium Risk" : "Low Risk"; + + return ( + + + + {score.toFixed(1)} + + {label} + Overall Risk Score (0–10) + + + ); +} + +// --------------------------------------------------------------------------- +// Simple table helpers +// --------------------------------------------------------------------------- + +function TableSection({ + title, + children, + empty, +}: { + title: string; + children: React.ReactNode; + empty: boolean; +}) { + return ( + + + {title} + + + {empty ? ( +

None

+ ) : ( + children + )} +
+
+ ); +} + +function Th({ children }: { children: React.ReactNode }) { + return ( + + {children} + + ); +} + +function Td({ children, className }: { children: React.ReactNode; className?: string }) { + return ( + + {children} + + ); +} + +// --------------------------------------------------------------------------- +// Result tables +// --------------------------------------------------------------------------- + +function DirectRisksTable({ rows }: { rows: DirectRiskEntry[] }) { + return ( +
+ + + + + + + + + + + {rows.map((r) => ( + + + + + + + ))} + +
FileRisk ScoreTemporal HotspotCentrality
+ {r.path} + {r.risk_score.toFixed(4)}{r.temporal_hotspot.toFixed(4)}{r.centrality.toFixed(6)}
+
+ ); +} + +function TransitiveTable({ rows }: { rows: TransitiveEntry[] }) { + return ( +
+ + + + + + + + + {rows.map((r) => ( + + + + + ))} + +
FileDepth
+ {r.path} + {r.depth}
+
+ ); +} + +function CochangeTable({ rows }: { rows: CochangeWarning[] }) { + return ( +
+ + + + + + + + + + {rows.map((r, i) => ( + + + + + + ))} + +
Changed FileMissing PartnerCo-change Count
+ {r.changed} + + {r.missing_partner} + {r.score}
+
+ ); +} + +function ReviewersTable({ rows }: { rows: ReviewerEntry[] }) { + return ( +
+ + + + + + + + + + {rows.map((r) => ( + + + + + + ))} + +
EmailFiles OwnedAvg Ownership %
{r.email}{r.files}{(r.ownership_pct * 100).toFixed(1)}%
+
+ ); +} + +function TestGapsList({ gaps }: { gaps: string[] }) { + return ( +
    + {gaps.map((g) => ( +
  • + {g} +
  • + ))} +
+ ); +} + +// --------------------------------------------------------------------------- +// Page +// --------------------------------------------------------------------------- + +export default function BlastRadiusPage() { + const params = useParams<{ id: string }>(); + const repoId = params.id; + + const [files, setFiles] = useState(""); + const [maxDepth, setMaxDepth] = useState(3); + const [loading, setLoading] = useState(false); + const [error, setError] = useState(null); + const [result, setResult] = useState(null); + + const handleAnalyze = async () => { + const changedFiles = files + .split("\n") + .map((l) => l.trim()) + .filter(Boolean); + + if (changedFiles.length === 0) { + setError("Enter at least one file path."); + return; + } + + setLoading(true); + setError(null); + setResult(null); + + try { + const data = await analyzeBlastRadius(repoId, { + changed_files: changedFiles, + max_depth: maxDepth, + }); + setResult(data); + } catch (err) { + setError(err instanceof Error ? err.message : "Analysis failed."); + } finally { + setLoading(false); + } + }; + + return ( +
+ {/* Header */} +
+

+ + Blast Radius +

+

+ Estimate the impact of a proposed PR — direct risks, transitive effects, reviewer + suggestions, and test gaps. +

+
+ + {/* Input form */} + + + Changed Files + + +