diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..7cffb78 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,128 @@ +# Changelog + +All notable changes to repowise are documented here. +This project follows [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [0.2.0] — 2026-04-07 + +A large overhaul: faster indexing, smarter doc generation, transactional storage, +new analysis capabilities, and a completely revamped web UI that surfaces every +new signal — all without changing the eight MCP tool surface. + +### Added + +#### Pipeline & ingestion +- **Parallel indexing.** AST parsing now runs across all CPU cores via + `ProcessPoolExecutor`. Graph construction and git history indexing run + concurrently with `asyncio.gather`. Per-file git history fetched through a + thread executor with a semaphore. +- **RAG-aware doc generation.** Pages are generated in topological order; each + generation prompt now includes summaries of the file's direct dependencies, + pulled from the vector store of already-generated pages. +- **Atomic three-store coordinator.** New `AtomicStorageCoordinator` buffers + writes across SQL, the in-memory dependency graph, and the vector store, then + flushes them as a single transaction. Failure in any store rolls back all three. +- **Dynamic import hint extractors.** The dependency graph now captures edges + that pure AST parsing misses: Django `INSTALLED_APPS` / `ROOT_URLCONF` / + `MIDDLEWARE`, pytest `conftest.py` fixture wiring, and Node/TS path aliases + from `tsconfig.json` and `package.json` `exports`. + +#### Analysis +- **Temporal hotspot decay.** New `temporal_hotspot_score` column on + `git_metadata`, computed as `Σ exp(-ln2 · age_days / 180) · min(lines/100, 3)` + per commit. Hotspot ranking now uses this score; commits from a year ago + contribute ~25% as much as commits from today. +- **Percentile ranks via SQL window function.** `recompute_git_percentiles()` + is now a single `PERCENT_RANK() OVER (PARTITION BY repo ORDER BY ...)` UPDATE + instead of an in-Python sort. Faster and correct on large repos. +- **PR blast radius analyzer.** New `PRBlastRadiusAnalyzer` returns direct + risks, transitive affected files, co-change warnings, recommended reviewers, + test gaps, and an overall 0–10 risk score. Surfaced via `get_risk(changed_files=...)` + and a new web page. +- **Security pattern scanner.** Indexing now runs `SecurityScanner` over each + file. Findings (eval/exec, weak crypto, raw SQL string construction, + hardcoded secrets, `pickle.loads`, etc.) are stored in a new + `security_findings` table. +- **Knowledge map.** Top owners, "bus factor 1" knowledge silos (>80% single + owner), and high-centrality "onboarding targets" with thin documentation — + surfaced in `get_overview` and the web overview page. + +#### LLM cost tracking +- New `llm_costs` table records every LLM call (model, tokens, USD cost). +- `CostTracker` aggregates session totals; pricing covers Claude 4.6 family, + GPT-4.1 family, and Gemini. +- New `repowise costs` CLI: `--since`, `--by operation|model|day`. +- Indexing progress bar shows a live `Cost: $X.XXXX` counter. + +#### MCP tool enhancements (still 8 tools — strictly more capable) +- `get_risk(targets, changed_files=None)` — when `changed_files` is provided, + returns the full PR blast-radius report (transitive affected, co-change + warnings, recommended reviewers, test gaps, overall 0–10 score). Per-file + responses now include `test_gap: bool` and `security_signals: list`. +- `get_overview()` — now includes a `knowledge_map` block (top owners, silos, + onboarding targets). +- `get_dead_code(min_confidence?, include_internals?, include_zombie_packages?)` — + sensitivity controls for false positives in framework-heavy code. + +#### REST endpoints (new) +- `GET /api/repos/{id}/costs` and `/costs/summary` — grouped LLM spend. +- `GET /api/repos/{id}/security` — security findings, filterable by file/severity. +- `POST /api/repos/{id}/blast-radius` — PR impact analysis. +- `GET /api/repos/{id}/knowledge-map` — owners / silos / onboarding targets. +- `GET /api/repos/{id}/health/coordinator` — three-store drift status. +- `GET /api/repos/{id}/hotspots` now returns `temporal_hotspot_score` and is + ordered by it. +- `GET /api/repos/{id}/git-metadata` now returns `test_gap`. +- Job SSE stream now emits `actual_cost_usd` (running cost since job start). + +#### Web UI (new pages and components) +- **Costs page** — daily bar chart, grouped tables by operation/model/day. +- **Blast Radius page** — paste files (or click hotspot suggestion chips) to + see risk gauge, transitive impact, co-change warnings, reviewers, test gaps. +- **Knowledge Map card** on the overview dashboard. +- **Trend column** on the hotspots table with flame indicator (default sort). +- **Security Panel** in the wiki page right sidebar. +- **"No tests" badge** on wiki pages with no detected test file. +- **System Health card** on the settings page (SQL / Vector / Graph counts + + drift % + status). +- **Live cost indicator** on the generation progress bar. + +#### CLI +- `repowise costs [--since DATE] [--by operation|model|day]` — new command. +- `repowise dead-code` — new flags `--min-confidence`, `--include-internals`, + `--include-zombie-packages`, `--no-unreachable`, `--no-unused-exports`. +- `repowise doctor` — new Check #10 reports coordinator drift across all + three stores. `--repair` deletes orphaned vectors and rebuilds missing graph + nodes from SQL. + +### Fixed +- C++ dependency resolution edge cases. +- Decision extraction timeout on very large histories. +- Resume / progress bar visibility for oversized files. +- Coordinator `health_check` falsely reporting 100% drift on LanceDB / Pg + vector stores (was returning -1 for the count). Now uses `list_page_ids()`. +- Coordinator `health_check` returning `null` graph node count when no + in-memory `GraphBuilder` is supplied. Now falls back to SQL `COUNT(*)`. + +### Internal +- Three new Alembic migrations: `0009_llm_costs`, `0010_temporal_hotspot_score`, + `0011_security_findings`. +- New module: `packages/core/.../persistence/coordinator.py` +- New module: `packages/core/.../ingestion/dynamic_hints/` (5 files) +- New module: `packages/core/.../analysis/pr_blast.py` +- New module: `packages/core/.../analysis/security_scan.py` +- New module: `packages/core/.../generation/cost_tracker.py` +- New module: `packages/server/.../services/knowledge_map.py` + +### Compatibility +- Existing repositories must run migrations: `repowise doctor` will detect + the missing tables and prompt; alternatively re-run `repowise init` to + rebuild from scratch. +- The eight MCP tool names and signatures are backwards compatible — new + parameters are all optional. + +--- + +## [0.1.31] — earlier + +See git history for releases prior to 0.2.0. diff --git a/README.md b/README.md index b880388..e70c930 100644 --- a/README.md +++ b/README.md @@ -94,11 +94,11 @@ Most tools are designed around data entities — one module, one file, one symbo |---|---|---| | `get_overview()` | Architecture summary, module map, entry points | First call on any unfamiliar codebase | | `get_context(targets, include?)` | Docs, ownership, decisions, freshness for any targets — files, modules, or symbols | Before reading or modifying code. Pass all relevant targets in one call. | -| `get_risk(targets)` | Hotspot scores, dependents, co-change partners, plain-English risk summary | Before modifying files — understand what could break | +| `get_risk(targets?, changed_files?)` | Hotspot scores, dependents, co-change partners, blast radius, recommended reviewers, test gaps, security signals, 0–10 risk score | Before modifying files — understand what could break | | `get_why(query?)` | Three modes: NL search over decisions · path-based decisions for a file · no-arg health dashboard | Before architectural changes — understand existing intent | | `search_codebase(query)` | Semantic search over the full wiki. Natural language. | When you don't know where something lives | | `get_dependency_path(from, to)` | Connection path between two files, modules, or symbols | When tracing how two things are connected | -| `get_dead_code()` | Unreachable code sorted by confidence and cleanup impact | Cleanup tasks | +| `get_dead_code(min_confidence?, include_internals?, include_zombie_packages?)` | Unreachable code sorted by confidence and cleanup impact | Cleanup tasks | | `get_architecture_diagram(module?)` | Mermaid diagram for the repo or a specific module | Documentation and presentation | ### Tool call comparison — a real task @@ -172,9 +172,13 @@ This is what happens when an AI agent has real codebase intelligence. | **Symbols** | Searchable index of every function, class, and method | | **Coverage** | Doc freshness per file with one-click regeneration | | **Ownership** | Contributor attribution and bus factor risk | -| **Hotspots** | Ranked high-churn files with commit history | +| **Hotspots** | Ranked by trend-weighted score (180-day decay) and churn | | **Dead Code** | Unused code with confidence scores and bulk actions | | **Decisions** | Architectural decisions with staleness monitoring | +| **Costs** | LLM spend by day, model, or operation, with running session totals | +| **Blast Radius** | Paste a PR file list, see transitive impact, reviewers, and test gaps | +| **Knowledge Map** | Top owners, bus-factor silos, and onboarding targets on the dashboard | +| **System Health** | SQL/vector/graph drift status from the atomic store coordinator | --- @@ -333,9 +337,18 @@ repowise search "" # semantic search over the wiki repowise status # coverage, freshness, dead code summary # Dead code -repowise dead-code # full report -repowise dead-code --safe-only # only safe-to-delete findings -repowise dead-code resolve # mark resolved / false positive +repowise dead-code # full report +repowise dead-code --safe-only # only safe-to-delete findings +repowise dead-code --min-confidence 0.8 # raise the confidence threshold +repowise dead-code --include-internals # include private/underscore symbols +repowise dead-code --include-zombie-packages # include unused declared packages +repowise dead-code resolve # mark resolved / false positive + +# Cost tracking +repowise costs # total LLM spend to date +repowise costs --by operation # grouped by operation type +repowise costs --by model # grouped by model +repowise costs --by day # grouped by day # Decisions repowise decision add # record a decision (interactive) @@ -348,7 +361,8 @@ repowise generate-claude-md # regenerate CLAUDE.md # Utilities repowise export [PATH] # export wiki as markdown files -repowise doctor # check setup, API keys, connectivity +repowise doctor # check setup, API keys, store drift +repowise doctor --repair # check and fix detected store mismatches repowise reindex # rebuild vector store (no LLM calls) ``` diff --git a/packages/cli/src/repowise/cli/__init__.py b/packages/cli/src/repowise/cli/__init__.py index 69cc60a..34ca0c8 100644 --- a/packages/cli/src/repowise/cli/__init__.py +++ b/packages/cli/src/repowise/cli/__init__.py @@ -6,4 +6,4 @@ AI-generated documentation. """ -__version__ = "0.1.31" +__version__ = "0.2.0" diff --git a/packages/cli/src/repowise/cli/commands/costs_cmd.py b/packages/cli/src/repowise/cli/commands/costs_cmd.py new file mode 100644 index 0000000..000eb84 --- /dev/null +++ b/packages/cli/src/repowise/cli/commands/costs_cmd.py @@ -0,0 +1,157 @@ +"""``repowise costs`` — display LLM cost history from the cost ledger.""" + +from __future__ import annotations + +from datetime import datetime +from pathlib import Path +from typing import Any + +import click +from rich.table import Table + +from repowise.cli.helpers import ( + console, + get_db_url_for_repo, + resolve_repo_path, + run_async, +) + + +def _parse_date(value: str | None) -> datetime | None: + """Parse an ISO date string into a datetime, or return None.""" + if value is None: + return None + try: + return datetime.fromisoformat(value) + except ValueError: + try: + from dateutil.parser import parse as _parse # type: ignore[import-untyped] + + return _parse(value) + except Exception as exc: + raise click.BadParameter(f"Cannot parse date '{value}': {exc}") from exc + + +@click.command("costs") +@click.argument("path", required=False, default=None) +@click.option( + "--since", + default=None, + metavar="DATE", + help="Only show costs since this date (ISO format, e.g. 2026-01-01).", +) +@click.option( + "--by", + "group_by", + type=click.Choice(["operation", "model", "day"]), + default="operation", + show_default=True, + help="Group costs by operation, model, or day.", +) +@click.option( + "--repo-path", + "repo_path_flag", + default=None, + metavar="PATH", + help="Repository path (defaults to current directory).", +) +def costs_command( + path: str | None, + since: str | None, + group_by: str, + repo_path_flag: str | None, +) -> None: + """Show LLM cost history for a repository. + + PATH (or --repo-path) defaults to the current directory. + """ + # Support both positional PATH and --repo-path flag + raw_path = path or repo_path_flag + repo_path = resolve_repo_path(raw_path) + + repowise_dir = repo_path / ".repowise" + if not repowise_dir.exists(): + console.print("[yellow]No .repowise/ directory found. Run 'repowise init' first.[/yellow]") + return + + since_dt = _parse_date(since) + + rows = run_async(_query_costs(repo_path, since=since_dt, group_by=group_by)) + + if not rows: + msg = "No cost records found" + if since_dt: + msg += f" since {since_dt.date()}" + msg += ". Run 'repowise init' with an LLM provider to generate costs." + console.print(f"[yellow]{msg}[/yellow]") + return + + # Build table + group_label = group_by.capitalize() + table = Table( + title=f"LLM Costs — grouped by {group_by}", + border_style="dim", + show_footer=True, + ) + table.add_column(group_label, style="cyan", footer="[bold]TOTAL[/bold]") + table.add_column("Calls", justify="right", footer=str(sum(r["calls"] for r in rows))) + table.add_column( + "Input Tokens", + justify="right", + footer=f"{sum(r['input_tokens'] for r in rows):,}", + ) + table.add_column( + "Output Tokens", + justify="right", + footer=f"{sum(r['output_tokens'] for r in rows):,}", + ) + table.add_column( + "Cost USD", + justify="right", + footer=f"[bold green]${sum(r['cost_usd'] for r in rows):.4f}[/bold green]", + ) + + for row in rows: + table.add_row( + str(row["group"] or "—"), + str(row["calls"]), + f"{row['input_tokens']:,}", + f"{row['output_tokens']:,}", + f"[green]${row['cost_usd']:.4f}[/green]", + ) + + console.print() + console.print(table) + console.print() + + +async def _query_costs( + repo_path: Path, + since: datetime | None, + group_by: str, +) -> list[dict[str, Any]]: + """Open the DB, look up the repo, and return aggregated cost rows.""" + from repowise.core.generation.cost_tracker import CostTracker + from repowise.core.persistence import ( + create_engine, + create_session_factory, + get_session, + init_db, + ) + from repowise.core.persistence.crud import get_repository_by_path + + url = get_db_url_for_repo(repo_path) + engine = create_engine(url) + await init_db(engine) + sf = create_session_factory(engine) + + try: + async with get_session(sf) as session: + repo = await get_repository_by_path(session, str(repo_path)) + if repo is None: + return [] + + tracker = CostTracker(session_factory=sf, repo_id=repo.id) + return await tracker.totals(since=since, group_by=group_by) + finally: + await engine.dispose() diff --git a/packages/cli/src/repowise/cli/commands/dead_code_cmd.py b/packages/cli/src/repowise/cli/commands/dead_code_cmd.py index ce4d7fc..e196cfd 100644 --- a/packages/cli/src/repowise/cli/commands/dead_code_cmd.py +++ b/packages/cli/src/repowise/cli/commands/dead_code_cmd.py @@ -30,12 +30,40 @@ type=click.Choice(["table", "json", "md"]), help="Output format.", ) +@click.option( + "--include-internals/--no-include-internals", + default=False, + help="Detect unused private/internal symbols (higher false-positive rate, off by default).", +) +@click.option( + "--include-zombie-packages/--no-include-zombie-packages", + default=True, + help="Detect monorepo packages with no external importers (on by default).", +) +@click.option( + "--no-unreachable", + "no_unreachable", + is_flag=True, + default=False, + help="Skip detection of unreachable files (in_degree=0).", +) +@click.option( + "--no-unused-exports", + "no_unused_exports", + is_flag=True, + default=False, + help="Skip detection of unused public exports.", +) def dead_code_command( path: str | None, min_confidence: float, safe_only: bool, kind: str | None, fmt: str, + include_internals: bool, + include_zombie_packages: bool, + no_unreachable: bool, + no_unused_exports: bool, ) -> None: """Detect dead and unused code.""" from pathlib import Path as PathlibPath @@ -74,9 +102,15 @@ def dead_code_command( pass # Analyze - config = {"min_confidence": min_confidence} + config: dict = { + "min_confidence": min_confidence, + "detect_unused_internals": include_internals, + "detect_zombie_packages": include_zombie_packages, + "detect_unreachable_files": not no_unreachable, + "detect_unused_exports": not no_unused_exports, + } if kind: - # Enable only the requested kind + # --kind overrides the individual detection flags to focus on one type config["detect_unreachable_files"] = kind == "unreachable_file" config["detect_unused_exports"] = kind == "unused_export" config["detect_unused_internals"] = kind == "unused_internal" diff --git a/packages/cli/src/repowise/cli/commands/doctor_cmd.py b/packages/cli/src/repowise/cli/commands/doctor_cmd.py index 0416eb6..bf25b01 100644 --- a/packages/cli/src/repowise/cli/commands/doctor_cmd.py +++ b/packages/cli/src/repowise/cli/commands/doctor_cmd.py @@ -226,6 +226,77 @@ async def _check_stores(): except Exception: checks.append(_check("Store consistency", True, "Could not check")) + # 10. AtomicStorageCoordinator drift check + coord_drift: float | None = None + coord_sql_pages: int | None = None + coord_vector_count: int | None = None + coord_graph_nodes: int | None = None + if db_ok: + try: + + async def _check_coordinator(): + from repowise.core.persistence import ( + create_engine, + create_session_factory, + get_session, + ) + from repowise.core.persistence.coordinator import AtomicStorageCoordinator + from repowise.core.persistence.vector_store import LanceDBVectorStore + from repowise.core.providers.embedding.base import MockEmbedder + + url = get_db_url_for_repo(repo_path) + engine = create_engine(url) + sf = create_session_factory(engine) + + vector_store = None + lance_dir = repowise_dir / "lancedb" + if lance_dir.exists(): + try: + embedder = MockEmbedder() + vector_store = LanceDBVectorStore(str(lance_dir), embedder=embedder) + except Exception: + pass + + async with get_session(sf) as session: + coord = AtomicStorageCoordinator( + session, graph_builder=None, vector_store=vector_store + ) + result = await coord.health_check() + + if vector_store is not None: + try: + await vector_store.close() + except Exception: + pass + await engine.dispose() + return result + + coord_result = run_async(_check_coordinator()) + coord_sql_pages = coord_result.get("sql_pages") + coord_vector_count = coord_result.get("vector_count") + coord_graph_nodes = coord_result.get("graph_nodes") + coord_drift = coord_result.get("drift") + + drift_pct = f"{coord_drift * 100:.1f}%" if coord_drift is not None else "N/A" + if coord_drift is None: + drift_color = "white" + elif coord_drift < 0.05: + drift_color = "green" + elif coord_drift < 0.15: + drift_color = "yellow" + else: + drift_color = "red" + + vec_display = str(coord_vector_count) if coord_vector_count != -1 and coord_vector_count is not None else "unknown" + drift_detail = ( + f"SQL={coord_sql_pages}, Vector={vec_display}, " + f"Drift=[{drift_color}]{drift_pct}[/{drift_color}]" + ) + coord_ok = coord_drift is None or coord_drift < 0.05 + checks.append(_check("Coordinator drift", coord_ok, drift_detail)) + except Exception as exc: + checks.append(_check("Coordinator drift", True, f"Could not check: {exc}")) + # Display table = Table(title="repowise Doctor") table.add_column("Check", style="cyan") diff --git a/packages/cli/src/repowise/cli/commands/init_cmd.py b/packages/cli/src/repowise/cli/commands/init_cmd.py index ba706e8..1ba9bec 100644 --- a/packages/cli/src/repowise/cli/commands/init_cmd.py +++ b/packages/cli/src/repowise/cli/commands/init_cmd.py @@ -411,6 +411,7 @@ def init_command( BarColumn(), TextColumn("{task.completed}/{task.total}"), TimeElapsedColumn(), + TextColumn("[green]${task.fields[cost]:.3f}[/green]"), console=console, ) as progress_bar: callback = RichProgressCallback(progress_bar, console) @@ -520,10 +521,51 @@ def init_command( BarColumn(), TextColumn("{task.completed}/{task.total}"), TimeElapsedColumn(), + TextColumn("[green]${task.fields[cost]:.3f}[/green]"), console=console, ) as gen_progress: gen_callback = RichProgressCallback(gen_progress, console) + # Construct a CostTracker backed by the real DB so every LLM call + # is persisted to the llm_costs table. We need the repo_id from the + # database row that was created/upserted during _persist_result + # (which has not run yet), so we look it up or fall back to in-memory. + from repowise.core.generation.cost_tracker import CostTracker + from repowise.cli.helpers import get_db_url_for_repo + from repowise.core.persistence import ( + create_engine as _create_engine, + create_session_factory as _create_sf, + get_session as _get_session, + init_db as _init_db, + upsert_repository as _upsert_repo, + ) + + async def _make_cost_tracker() -> CostTracker: + url = get_db_url_for_repo(repo_path) + engine = _create_engine(url) + await _init_db(engine) + sf = _create_sf(engine) + async with _get_session(sf) as _sess: + _repo = await _upsert_repo( + _sess, + name=result.repo_name, + local_path=str(repo_path), + ) + _repo_id = _repo.id + # Keep engine alive for the duration of generation — it will be + # disposed by _persist_result's own engine later. + return CostTracker(session_factory=sf, repo_id=_repo_id) + + try: + cost_tracker = run_async(_make_cost_tracker()) + except Exception: + # Fallback to in-memory tracker if DB setup fails + cost_tracker = CostTracker() + + # Attach tracker to provider unconditionally (all providers now + # accept _cost_tracker as an attribute) + provider._cost_tracker = cost_tracker + generated_pages = run_async( run_generation( repo_path=repo_path, @@ -538,6 +580,7 @@ def init_command( concurrency=concurrency, progress=gen_callback, resume=resume, + cost_tracker=cost_tracker, ) ) diff --git a/packages/cli/src/repowise/cli/main.py b/packages/cli/src/repowise/cli/main.py index 02dd069..7fa3650 100644 --- a/packages/cli/src/repowise/cli/main.py +++ b/packages/cli/src/repowise/cli/main.py @@ -6,6 +6,7 @@ from repowise.cli import __version__ from repowise.cli.commands.claude_md_cmd import claude_md_command +from repowise.cli.commands.costs_cmd import costs_command from repowise.cli.commands.dead_code_cmd import dead_code_command from repowise.cli.commands.decision_cmd import decision_group from repowise.cli.commands.doctor_cmd import doctor_command @@ -28,6 +29,7 @@ def cli() -> None: cli.add_command(init_command) cli.add_command(claude_md_command) +cli.add_command(costs_command) cli.add_command(update_command) cli.add_command(dead_code_command) cli.add_command(decision_group) diff --git a/packages/cli/src/repowise/cli/ui.py b/packages/cli/src/repowise/cli/ui.py index 3c500d1..633c052 100644 --- a/packages/cli/src/repowise/cli/ui.py +++ b/packages/cli/src/repowise/cli/ui.py @@ -569,7 +569,9 @@ def on_phase_start(self, phase: str, total: int | None) -> None: if phase in self._tasks: self._progress.update(self._tasks[phase], total=total, visible=True) else: - self._tasks[phase] = self._progress.add_task(label, total=total, visible=True) + self._tasks[phase] = self._progress.add_task( + label, total=total, visible=True, cost=0.0 + ) def on_item_done(self, phase: str) -> None: if phase in self._tasks: @@ -582,3 +584,11 @@ def on_message(self, level: str, text: str) -> None: self._progress.console.print(f" [{style}]{text}[/{style}]") else: self._progress.console.print(f" {text}") + + def set_cost(self, total_cost: float) -> None: + """Update the live cost display on all active progress tasks.""" + for task_id in self._tasks.values(): + try: + self._progress.update(task_id, cost=total_cost) + except Exception: + pass diff --git a/packages/core/alembic/versions/0009_llm_costs.py b/packages/core/alembic/versions/0009_llm_costs.py new file mode 100644 index 0000000..328ec5d --- /dev/null +++ b/packages/core/alembic/versions/0009_llm_costs.py @@ -0,0 +1,54 @@ +"""Add llm_costs table for runtime LLM cost tracking. + +Revision ID: 0009 +Revises: 0008 +Create Date: 2026-04-07 +""" + +from __future__ import annotations + +from collections.abc import Sequence + +import sqlalchemy as sa +from alembic import op + +# revision identifiers +revision: str = "0009" +down_revision: str | None = "0008" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + op.create_table( + "llm_costs", + sa.Column("id", sa.Integer, primary_key=True, autoincrement=True), + sa.Column( + "repository_id", + sa.String(32), + sa.ForeignKey("repositories.id", ondelete="CASCADE"), + nullable=False, + ), + sa.Column( + "ts", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.text("CURRENT_TIMESTAMP"), + ), + sa.Column("model", sa.String(100), nullable=False), + sa.Column("operation", sa.String(50), nullable=False), + sa.Column("input_tokens", sa.Integer, nullable=False), + sa.Column("output_tokens", sa.Integer, nullable=False), + sa.Column("cost_usd", sa.Float, nullable=False), + sa.Column("file_path", sa.String(1024), nullable=True), + ) + op.create_index( + "ix_llm_costs_repository_ts", + "llm_costs", + ["repository_id", "ts"], + ) + + +def downgrade() -> None: + op.drop_index("ix_llm_costs_repository_ts", table_name="llm_costs") + op.drop_table("llm_costs") diff --git a/packages/core/alembic/versions/0010_temporal_hotspot_score.py b/packages/core/alembic/versions/0010_temporal_hotspot_score.py new file mode 100644 index 0000000..b2ce829 --- /dev/null +++ b/packages/core/alembic/versions/0010_temporal_hotspot_score.py @@ -0,0 +1,38 @@ +"""Add temporal_hotspot_score column to git_metadata. + +Stores an exponentially time-decayed churn score used as the primary +signal for hotspot percentile ranking (PERCENT_RANK window function). + +Revision ID: 0010 +Revises: 0009 +Create Date: 2026-04-07 +""" + +from __future__ import annotations + +from collections.abc import Sequence + +import sqlalchemy as sa +from alembic import op + +# revision identifiers +revision: str = "0010" +down_revision: str | None = "0009" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + op.add_column( + "git_metadata", + sa.Column( + "temporal_hotspot_score", + sa.Float, + nullable=True, + server_default="0.0", + ), + ) + + +def downgrade() -> None: + op.drop_column("git_metadata", "temporal_hotspot_score") diff --git a/packages/core/alembic/versions/0011_security_findings.py b/packages/core/alembic/versions/0011_security_findings.py new file mode 100644 index 0000000..86e01f6 --- /dev/null +++ b/packages/core/alembic/versions/0011_security_findings.py @@ -0,0 +1,51 @@ +"""Add security_findings table. + +Stores lightweight security signals detected during file ingestion, +including eval/exec calls, hardcoded secrets, raw SQL, weak hashes, etc. + +Revision ID: 0011 +Revises: 0010 +Create Date: 2026-04-07 +""" + +from __future__ import annotations + +from collections.abc import Sequence + +import sqlalchemy as sa +from alembic import op + +# revision identifiers +revision: str = "0011" +down_revision: str | None = "0010" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + op.create_table( + "security_findings", + sa.Column("id", sa.Integer, primary_key=True, autoincrement=True), + sa.Column("repository_id", sa.String(32), nullable=False), + sa.Column("file_path", sa.String(1024), nullable=False), + sa.Column("kind", sa.String(100), nullable=False), + sa.Column("severity", sa.String(20), nullable=False), + sa.Column("snippet", sa.Text, nullable=True), + sa.Column("line_number", sa.Integer, nullable=True), + sa.Column( + "detected_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.func.now(), + ), + ) + op.create_index( + "ix_security_findings_repo_file", + "security_findings", + ["repository_id", "file_path"], + ) + + +def downgrade() -> None: + op.drop_index("ix_security_findings_repo_file", table_name="security_findings") + op.drop_table("security_findings") diff --git a/packages/core/src/repowise/core/__init__.py b/packages/core/src/repowise/core/__init__.py index 7550891..de95752 100644 --- a/packages/core/src/repowise/core/__init__.py +++ b/packages/core/src/repowise/core/__init__.py @@ -6,4 +6,4 @@ Namespace package: repowise.core is part of the repowise namespace. """ -__version__ = "0.1.31" +__version__ = "0.2.0" diff --git a/packages/core/src/repowise/core/analysis/pr_blast.py b/packages/core/src/repowise/core/analysis/pr_blast.py new file mode 100644 index 0000000..3404714 --- /dev/null +++ b/packages/core/src/repowise/core/analysis/pr_blast.py @@ -0,0 +1,282 @@ +"""PR blast radius analyzer. + +Given a set of changed files, computes: + - Direct risk per file (hotspot * centrality) + - Transitive affected files (graph ancestors up to max_depth) + - Co-change warnings (historical co-change partners NOT in the PR) + - Recommended reviewers (top owners of affected files) + - Test gaps (affected files without a corresponding test file) + - Overall risk score (0-10) + +Reuses existing data: graph_nodes/graph_edges (SQL), git_metadata, and the +co_change_partners_json field stored in git_metadata rows. +""" + +from __future__ import annotations + +import json +import os +from collections import defaultdict +from typing import Any + +from sqlalchemy import select, text +from sqlalchemy.ext.asyncio import AsyncSession + +from repowise.core.persistence.models import GitMetadata, GraphNode + + +class PRBlastRadiusAnalyzer: + """Compute blast radius for a proposed PR given its changed files.""" + + def __init__(self, session: AsyncSession, repo_id: str) -> None: + self._session = session + self._repo_id = repo_id + + async def analyze_files( + self, + changed_files: list[str], + max_depth: int = 3, + ) -> dict: + """Return full blast-radius analysis for the given changed files. + + Parameters + ---------- + changed_files: + Relative file paths that are modified in the PR. + max_depth: + Maximum BFS depth for transitive ancestor lookup. + """ + changed_set = set(changed_files) + + # 1. Per-file direct risk + direct_risks = await self._score_files(changed_files) + + # 2. Transitive affected files + transitive_affected = await self._transitive_affected(changed_files, max_depth) + all_affected_paths = list(changed_set | {e["path"] for e in transitive_affected}) + + # 3. Co-change warnings + cochange_warnings = await self._cochange_warnings(changed_files, changed_set) + + # 4. Recommended reviewers (over all affected files) + recommended_reviewers = await self._recommend_reviewers(all_affected_paths) + + # 5. Test gaps + test_gaps = await self._find_test_gaps(all_affected_paths) + + # 6. Overall risk score (0-10) + overall_risk_score = self._compute_overall_risk(direct_risks, transitive_affected) + + return { + "direct_risks": direct_risks, + "transitive_affected": transitive_affected, + "cochange_warnings": cochange_warnings, + "recommended_reviewers": recommended_reviewers, + "test_gaps": test_gaps, + "overall_risk_score": overall_risk_score, + } + + # ------------------------------------------------------------------ + # Private helpers + # ------------------------------------------------------------------ + + async def _score_files(self, paths: list[str]) -> list[dict]: + """Return direct risk records for each changed file.""" + if not paths: + return [] + + # Fetch git_metadata for all paths in one query + res = await self._session.execute( + select(GitMetadata).where( + GitMetadata.repository_id == self._repo_id, + GitMetadata.file_path.in_(paths), + ) + ) + meta_by_path: dict[str, Any] = {m.file_path: m for m in res.scalars().all()} + + # Fetch graph node pagerank (used as centrality proxy) + node_res = await self._session.execute( + select(GraphNode).where( + GraphNode.repository_id == self._repo_id, + GraphNode.node_id.in_(paths), + ) + ) + node_by_path: dict[str, Any] = {n.node_id: n for n in node_res.scalars().all()} + + results = [] + for path in paths: + meta = meta_by_path.get(path) + node = node_by_path.get(path) + temporal = float(getattr(meta, "temporal_hotspot_score", 0.0) or 0.0) + centrality = float(getattr(node, "pagerank", 0.0) or 0.0) + risk_score = self._score_file(temporal, centrality) + results.append( + { + "path": path, + "risk_score": round(risk_score, 4), + "temporal_hotspot": round(temporal, 4), + "centrality": round(centrality, 6), + } + ) + + results.sort(key=lambda x: -x["risk_score"]) + return results + + @staticmethod + def _score_file(temporal_hotspot_score: float, centrality: float) -> float: + """Compute file-level risk: centrality * (1 + temporal_hotspot_score).""" + return centrality * (1.0 + temporal_hotspot_score) + + async def _transitive_affected( + self, changed_files: list[str], max_depth: int + ) -> list[dict]: + """BFS over reverse graph edges (source_node_id -> target_node_id direction). + + We want files that *import* the changed files (i.e. are affected when a + changed file changes). In graph_edges, an edge means + ``source imports target``, so we look for rows where + ``target_node_id IN (frontier)`` and collect the ``source_node_id`` + values — those are the files that depend on our changed set. + """ + visited: dict[str, int] = {} # path -> depth at which it was first reached + frontier = list(set(changed_files)) + + for depth in range(1, max_depth + 1): + if not frontier: + break + # SQLite / SQLAlchemy compatible IN query via text() + placeholders = ",".join(f":p{i}" for i in range(len(frontier))) + params: dict[str, Any] = {"repo_id": self._repo_id} + params.update({f"p{i}": v for i, v in enumerate(frontier)}) + rows = await self._session.execute( + text( + f"SELECT DISTINCT source_node_id FROM graph_edges " + f"WHERE repository_id = :repo_id " + f"AND target_node_id IN ({placeholders})" + ), + params, + ) + next_frontier = [] + for (src,) in rows: + if src not in visited and src not in set(changed_files): + visited[src] = depth + next_frontier.append(src) + frontier = next_frontier + + return [{"path": p, "depth": d} for p, d in sorted(visited.items(), key=lambda x: x[1])] + + async def _cochange_warnings( + self, changed_files: list[str], changed_set: set[str] + ) -> list[dict]: + """Return co-change partners of changed files that are NOT in the PR.""" + if not changed_files: + return [] + + res = await self._session.execute( + select(GitMetadata).where( + GitMetadata.repository_id == self._repo_id, + GitMetadata.file_path.in_(changed_files), + ) + ) + + warnings = [] + for meta in res.scalars().all(): + partners = json.loads(meta.co_change_partners_json or "[]") + for partner in partners: + partner_path = partner.get("file_path") or partner.get("path") or "" + score = float(partner.get("co_change_count") or partner.get("count") or 0) + if partner_path and partner_path not in changed_set: + warnings.append( + { + "changed": meta.file_path, + "missing_partner": partner_path, + "score": score, + } + ) + + warnings.sort(key=lambda x: -x["score"]) + return warnings + + async def _recommend_reviewers(self, affected_files: list[str]) -> list[dict]: + """Aggregate top owners of affected files; return top 5.""" + if not affected_files: + return [] + + res = await self._session.execute( + select(GitMetadata).where( + GitMetadata.repository_id == self._repo_id, + GitMetadata.file_path.in_(affected_files), + ) + ) + + owner_files: dict[str, list[float]] = defaultdict(list) + for meta in res.scalars().all(): + email = meta.primary_owner_email or "" + pct = float(meta.primary_owner_commit_pct or 0.0) + if email: + owner_files[email].append(pct) + + reviewers = [ + { + "email": email, + "files": len(pcts), + "ownership_pct": round(sum(pcts) / len(pcts), 3) if pcts else 0.0, + } + for email, pcts in owner_files.items() + ] + reviewers.sort(key=lambda x: (-x["files"], -x["ownership_pct"])) + return reviewers[:5] + + async def _find_test_gaps(self, affected_files: list[str]) -> list[str]: + """Return files that lack a corresponding test file. + + Checks graph_nodes for paths matching test_, _test, or + .spec.* patterns. + """ + if not affected_files: + return [] + + node_res = await self._session.execute( + select(GraphNode.node_id).where( + GraphNode.repository_id == self._repo_id, + GraphNode.is_test == True, # noqa: E712 + ) + ) + test_paths = {row[0] for row in node_res.all()} + + gaps = [] + for path in affected_files: + base = os.path.splitext(os.path.basename(path))[0] + ext = os.path.splitext(path)[1].lstrip(".") + has_test = any( + ( + f"test_{base}" in tp + or f"{base}_test" in tp + or f"{base}.spec.{ext}" in tp + or f"{base}.spec." in tp + ) + for tp in test_paths + ) + if not has_test: + gaps.append(path) + + return gaps + + @staticmethod + def _compute_overall_risk( + direct_risks: list[dict], + transitive_affected: list[dict], + ) -> float: + """Compute overall risk score on 0-10 scale.""" + if not direct_risks: + return 0.0 + + avg_direct = sum(r["risk_score"] for r in direct_risks) / len(direct_risks) + max_direct = max(r["risk_score"] for r in direct_risks) + breadth_bonus = min(len(transitive_affected) / 20.0, 1.0) # 0-1 + + # Weighted: 40% avg, 40% max, 20% breadth — scaled to 10 + raw = (0.4 * avg_direct + 0.4 * max_direct + 0.2 * breadth_bonus) + # Normalise pagerank-based scores (typically << 1) to 0-10 + score = min(raw * 100.0, 10.0) + return round(score, 2) diff --git a/packages/core/src/repowise/core/analysis/security_scan.py b/packages/core/src/repowise/core/analysis/security_scan.py new file mode 100644 index 0000000..d9abc66 --- /dev/null +++ b/packages/core/src/repowise/core/analysis/security_scan.py @@ -0,0 +1,128 @@ +"""Lightweight security signal extractor. + +Scans indexed symbols and source for keyword/regex patterns that indicate +authentication, secret handling, raw SQL, dangerous deserialization, etc. + +Stores findings in the security_findings table (see migration 0011). +""" + +from __future__ import annotations + +import re +from datetime import UTC, datetime +from typing import Any + +from sqlalchemy.ext.asyncio import AsyncSession + +# --------------------------------------------------------------------------- +# Pattern registry: (compiled_pattern, kind_label, severity) +# --------------------------------------------------------------------------- +_PATTERNS: list[tuple[re.Pattern, str, str]] = [ + (re.compile(r"eval\s*\("), "eval_call", "high"), + (re.compile(r"exec\s*\("), "exec_call", "high"), + (re.compile(r"pickle\.loads"), "pickle_loads", "high"), + (re.compile(r"subprocess\..*shell\s*=\s*True"), "subprocess_shell_true", "high"), + (re.compile(r"os\.system"), "os_system", "high"), + (re.compile(r"password\s*=\s*['\"]"), "hardcoded_password", "high"), + (re.compile(r"(?:api_?key|secret)\s*=\s*['\"]"), "hardcoded_secret", "high"), + (re.compile(r'f[\'"].*SELECT.*\{.*\}'), "fstring_sql", "med"), + (re.compile(r'\.execute\(\s*[\'\"]\s*SELECT.*\+'), "concat_sql", "med"), + (re.compile(r"verify\s*=\s*False"), "tls_verify_false", "med"), + (re.compile(r"\bmd5\b|\bsha1\b"), "weak_hash", "low"), +] + +# Symbol names that are informational security hotspots +_SYMBOL_KEYWORDS = re.compile( + r"\b(auth|token|password|jwt|session|crypto)\b", re.IGNORECASE +) + + +class SecurityScanner: + """Scan a single file for security signals and persist to the database.""" + + def __init__(self, session: AsyncSession, repo_id: str) -> None: + self._session = session + self._repo_id = repo_id + + async def scan_file( + self, + file_path: str, + source: str, + symbols: list[Any], + ) -> list[dict]: + """Scan *source* text and symbol names; return list of finding dicts. + + Parameters + ---------- + file_path: + Relative path of the file (for reference only; not used in scan). + source: + Full text content of the file. + symbols: + List of symbol objects that have a ``name`` attribute (or similar). + """ + findings: list[dict] = [] + lines = source.splitlines() + + # Line-by-line pattern scan + for lineno, line in enumerate(lines, start=1): + for pattern, kind, severity in _PATTERNS: + if pattern.search(line): + # Trim snippet to keep it concise + snippet = line.strip()[:120] + findings.append( + { + "kind": kind, + "severity": severity, + "snippet": snippet, + "line": lineno, + } + ) + + # Symbol-name scan (informational / low) + for sym in symbols: + name = getattr(sym, "name", "") or getattr(sym, "qualified_name", "") or "" + if name and _SYMBOL_KEYWORDS.search(name): + findings.append( + { + "kind": "security_sensitive_symbol", + "severity": "low", + "snippet": name, + "line": getattr(sym, "start_line", 0) or 0, + } + ) + + return findings + + async def persist(self, file_path: str, findings: list[dict]) -> None: + """Insert security findings into the security_findings table. + + Uses raw INSERT to stay independent of any ORM session state. + Silently skips if the table doesn't exist yet (pre-migration). + """ + from sqlalchemy import text + + if not findings: + return + + now = datetime.now(UTC) + for finding in findings: + try: + await self._session.execute( + text( + "INSERT INTO security_findings " + "(repository_id, file_path, kind, severity, snippet, line_number, detected_at) " + "VALUES (:repo_id, :file_path, :kind, :severity, :snippet, :line, :detected_at)" + ), + { + "repo_id": self._repo_id, + "file_path": file_path, + "kind": finding["kind"], + "severity": finding["severity"], + "snippet": finding.get("snippet", ""), + "line": finding.get("line", 0), + "detected_at": now, + }, + ) + except Exception: # noqa: BLE001 — table may not exist pre-migration + break diff --git a/packages/core/src/repowise/core/generation/cost_tracker.py b/packages/core/src/repowise/core/generation/cost_tracker.py new file mode 100644 index 0000000..542c131 --- /dev/null +++ b/packages/core/src/repowise/core/generation/cost_tracker.py @@ -0,0 +1,264 @@ +"""Runtime LLM cost tracking for repowise. + +Tracks token usage and cost per session, and optionally persists rows to +the ``llm_costs`` table for historical reporting via ``repowise costs``. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any + +import structlog + +log = structlog.get_logger(__name__) + +# --------------------------------------------------------------------------- +# Pricing table — USD per 1 million tokens +# --------------------------------------------------------------------------- + +_PRICING: dict[str, dict[str, float]] = { + # Anthropic + "claude-opus-4-6": {"input": 15.0, "output": 75.0}, + "claude-sonnet-4-6": {"input": 3.0, "output": 15.0}, + "claude-haiku-4-5": {"input": 0.8, "output": 4.0}, + "claude-3-5-sonnet-20241022": {"input": 3.0, "output": 15.0}, + # OpenAI + "gpt-4o": {"input": 2.5, "output": 10.0}, + "gpt-4o-mini": {"input": 0.15, "output": 0.6}, + # Google Gemini + "gemini-2.0-flash": {"input": 0.075, "output": 0.3}, + "gemini-2.5-flash": {"input": 0.30, "output": 2.50}, + "gemini-2.5-pro": {"input": 1.25, "output": 10.0}, + "gemini-1.5-flash": {"input": 0.075, "output": 0.30}, + "gemini-1.5-flash-lite": {"input": 0.075, "output": 0.30}, + "gemini-1.5-pro": {"input": 1.25, "output": 5.0}, + # Gemini preview / experimental models + "gemini-3.1-flash-lite-preview": {"input": 0.075, "output": 0.30}, + "gemini-3-flash-preview": {"input": 0.075, "output": 0.30}, +} + +_FALLBACK_PRICING: dict[str, float] = {"input": 3.0, "output": 15.0} + +# Track which unknown models we've already warned about (per-process) +_warned_models: set[str] = set() + + +def _get_pricing(model: str) -> dict[str, float]: + """Return pricing for *model*, falling back and warning if unknown.""" + if model in _PRICING: + return _PRICING[model] + if model not in _warned_models: + log.warning("cost_tracker.unknown_model", model=model, fallback=_FALLBACK_PRICING) + _warned_models.add(model) + return _FALLBACK_PRICING + + +# --------------------------------------------------------------------------- +# CostTracker +# --------------------------------------------------------------------------- + + +class CostTracker: + """Tracks LLM token usage and cost for a session. + + Optionally persists each call to the ``llm_costs`` table when a + *session_factory* (async SQLAlchemy sessionmaker) is supplied. + + Parameters + ---------- + session_factory: + Async SQLAlchemy sessionmaker. When ``None``, only in-memory + tracking is performed. + repo_id: + Repository primary key to associate with persisted rows. + """ + + def __init__( + self, + session_factory: Any | None = None, + repo_id: str | None = None, + ) -> None: + self._session_factory = session_factory + self._repo_id = repo_id + self._session_cost: float = 0.0 + self._session_tokens: int = 0 + + # ------------------------------------------------------------------ + # Properties + # ------------------------------------------------------------------ + + @property + def session_cost(self) -> float: + """Cumulative USD cost for this tracker instance.""" + return self._session_cost + + @property + def session_tokens(self) -> int: + """Cumulative tokens (input + output) for this tracker instance.""" + return self._session_tokens + + # ------------------------------------------------------------------ + # Recording + # ------------------------------------------------------------------ + + async def record( + self, + model: str, + input_tokens: int, + output_tokens: int, + operation: str, + file_path: str | None = None, + ) -> float: + """Record a single LLM call and return its cost in USD. + + Parameters + ---------- + model: + Model identifier, e.g. ``"claude-sonnet-4-6"``. + input_tokens: + Number of input/prompt tokens consumed. + output_tokens: + Number of output/completion tokens consumed. + operation: + Logical operation label, e.g. ``"doc_generation"`` or + ``"embedding"``. + file_path: + Source file being processed, if available. + + Returns + ------- + float + Cost in USD for this call. + """ + pricing = _get_pricing(model) + cost = (input_tokens * pricing["input"] + output_tokens * pricing["output"]) / 1_000_000 + + self._session_cost += cost + self._session_tokens += input_tokens + output_tokens + + log.debug( + "cost_tracker.record", + model=model, + input_tokens=input_tokens, + output_tokens=output_tokens, + cost_usd=cost, + operation=operation, + file_path=file_path, + ) + + if self._session_factory is not None and self._repo_id is not None: + await self._persist( + model=model, + input_tokens=input_tokens, + output_tokens=output_tokens, + cost_usd=cost, + operation=operation, + file_path=file_path, + ) + + return cost + + async def _persist( + self, + *, + model: str, + input_tokens: int, + output_tokens: int, + cost_usd: float, + operation: str, + file_path: str | None, + ) -> None: + """Write a row to the ``llm_costs`` table.""" + try: + from repowise.core.persistence.models import LlmCost + from repowise.core.persistence import get_session + + async with get_session(self._session_factory) as session: + row = LlmCost( + repository_id=self._repo_id, + model=model, + operation=operation, + input_tokens=input_tokens, + output_tokens=output_tokens, + cost_usd=cost_usd, + file_path=file_path, + ) + session.add(row) + await session.commit() + except Exception as exc: + log.warning("cost_tracker.persist_failed", error=str(exc)) + + # ------------------------------------------------------------------ + # Querying + # ------------------------------------------------------------------ + + async def totals( + self, + since: datetime | None = None, + group_by: str = "operation", + ) -> list[dict]: + """Query aggregate cost totals from the database. + + Parameters + ---------- + since: + Only include rows whose ``ts`` is on or after this datetime. + group_by: + Grouping dimension: ``"operation"``, ``"model"``, or ``"day"``. + + Returns + ------- + list[dict] + Each dict has keys: ``group``, ``calls``, ``input_tokens``, + ``output_tokens``, ``cost_usd``. + """ + if self._session_factory is None or self._repo_id is None: + return [] + + try: + import sqlalchemy as sa + from repowise.core.persistence.models import LlmCost + from repowise.core.persistence import get_session + + async with get_session(self._session_factory) as session: + if group_by == "model": + group_col = LlmCost.model + elif group_by == "day": + # SQLite strftime; works for Postgres too with cast + group_col = sa.func.strftime("%Y-%m-%d", LlmCost.ts) + else: + group_col = LlmCost.operation + + stmt = ( + sa.select( + group_col.label("group"), + sa.func.count().label("calls"), + sa.func.sum(LlmCost.input_tokens).label("input_tokens"), + sa.func.sum(LlmCost.output_tokens).label("output_tokens"), + sa.func.sum(LlmCost.cost_usd).label("cost_usd"), + ) + .where(LlmCost.repository_id == self._repo_id) + .group_by(group_col) + .order_by(sa.func.sum(LlmCost.cost_usd).desc()) + ) + + if since is not None: + stmt = stmt.where(LlmCost.ts >= since) + + result = await session.execute(stmt) + rows = result.fetchall() + + return [ + { + "group": row.group, + "calls": row.calls, + "input_tokens": row.input_tokens or 0, + "output_tokens": row.output_tokens or 0, + "cost_usd": row.cost_usd or 0.0, + } + for row in rows + ] + except Exception as exc: + log.warning("cost_tracker.totals_failed", error=str(exc)) + return [] diff --git a/packages/core/src/repowise/core/generation/page_generator.py b/packages/core/src/repowise/core/generation/page_generator.py index 84843cb..4f0c9d3 100644 --- a/packages/core/src/repowise/core/generation/page_generator.py +++ b/packages/core/src/repowise/core/generation/page_generator.py @@ -482,6 +482,7 @@ async def guarded_named(page_id: str, coro: Any) -> Any: # Embed page for RAG (B1) if self._vector_store is not None and isinstance(result, GeneratedPage): try: + page_summary = _extract_summary(result.content) await self._vector_store.embed_and_upsert( result.page_id, result.content, @@ -489,6 +490,7 @@ async def guarded_named(page_id: str, coro: Any) -> Any: "page_type": result.page_type, "target_path": result.target_path, "content": result.content[:600], + "summary": page_summary, }, ) except Exception as e: @@ -668,10 +670,70 @@ async def guarded_named(page_id: str, coro: Any) -> Any: # Context is assembled for ALL code files (module pages need it). # Pages are generated only for files that cross the significance bar. # page_summaries from level 0+1 are available here (B2). + # + # Topo-sort: process leaves (no internal out-edges) before roots so that + # dependency summaries are available when assembling dependents' contexts. + # Falls back to existing priority order if networkx is unavailable or graph + # has cycles. + code_file_paths = [p.file_info.path for p in code_files] + try: + import networkx as nx # type: ignore[import] + + # Build a subgraph of just the code files we are about to generate + code_file_set = set(code_file_paths) + dag = nx.DiGraph() + dag.add_nodes_from(code_file_paths) + for path_ in code_file_paths: + if path_ in graph: + for succ in graph.successors(path_): + if succ in code_file_set: + dag.add_edge(path_, succ) # path_ depends on succ + + if nx.is_directed_acyclic_graph(dag): + # topological_sort yields nodes in an order where for each edge u→v, + # u comes before v — i.e. dependents before dependencies. + # We want leaves (dependencies) first, so reverse the order. + topo_order = list(reversed(list(nx.topological_sort(dag)))) + else: + # Cycle present: condense SCCs, topo-sort condensation, then expand. + condensation = nx.condensation(dag) + topo_order_scc = list(reversed(list(nx.topological_sort(condensation)))) + scc_members: dict[int, list[str]] = { + n: list(condensation.nodes[n]["members"]) for n in condensation.nodes + } + topo_order = [ + node for scc_id in topo_order_scc for node in scc_members[scc_id] + ] + + # Preserve priority ordering within the topo-sort by mapping paths to + # their original priority index. + priority_index = {p: i for i, p in enumerate(code_file_paths)} + topo_order = [p for p in topo_order if p in priority_index] + # Re-sort code_files to match topo_order + path_to_parsed = {p.file_info.path: p for p in code_files} + code_files = [path_to_parsed[p] for p in topo_order if p in path_to_parsed] + except Exception: + pass # Keep existing priority order on any failure + file_page_contexts: dict[str, FilePageContext] = {} level2_coros: list[tuple[str, Any]] = [] for p in code_files: + # Pre-fetch dependency summaries from vector store for deps not yet + # in the completed_page_summaries accumulator (e.g. from prior runs). + if self._vector_store is not None: + path_ = p.file_info.path + out_edges = list(graph.successors(path_)) if path_ in graph else [] + internal_deps = [e for e in out_edges if not e.startswith("external:")] + for dep in internal_deps: + if dep not in completed_page_summaries: + try: + result = await self._vector_store.get_page_summary_by_path(dep) + if result and result.get("summary"): + completed_page_summaries[dep] = result["summary"] + except Exception: + pass # Non-fatal — dep context is optional + ctx = self._assembler.assemble_file_page( p, graph, diff --git a/packages/core/src/repowise/core/generation/templates/file_page.j2 b/packages/core/src/repowise/core/generation/templates/file_page.j2 index 2a90e43..8206d8f 100644 --- a/packages/core/src/repowise/core/generation/templates/file_page.j2 +++ b/packages/core/src/repowise/core/generation/templates/file_page.j2 @@ -51,6 +51,15 @@ Module docstring: {{ ctx.docstring }} {% endfor %} {% endif %} +{% if ctx.dependency_summaries %} +## Dependency Context +The following are summaries of files that this file imports. Use these to understand how the imports are intended to be used: +{% for dep_path, dep_summary in ctx.dependency_summaries.items() %} +### `{{ dep_path }}` +{{ dep_summary }} +{% endfor %} +{% endif %} + {% if ctx.file_source_snippet %} ## Source Snippet ```{{ ctx.language }} diff --git a/packages/core/src/repowise/core/ingestion/dynamic_hints/__init__.py b/packages/core/src/repowise/core/ingestion/dynamic_hints/__init__.py new file mode 100644 index 0000000..95af349 --- /dev/null +++ b/packages/core/src/repowise/core/ingestion/dynamic_hints/__init__.py @@ -0,0 +1,6 @@ +from __future__ import annotations + +from .base import DynamicEdge, DynamicHintExtractor +from .registry import HintRegistry + +__all__ = ["HintRegistry", "DynamicEdge", "DynamicHintExtractor"] diff --git a/packages/core/src/repowise/core/ingestion/dynamic_hints/base.py b/packages/core/src/repowise/core/ingestion/dynamic_hints/base.py new file mode 100644 index 0000000..25f2541 --- /dev/null +++ b/packages/core/src/repowise/core/ingestion/dynamic_hints/base.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from pathlib import Path + + +@dataclass +class DynamicEdge: + source: str # repo-relative path + target: str # repo-relative path + edge_type: str # "dynamic_uses" | "dynamic_imports" | "url_route" + hint_source: str # extractor name + weight: float = 1.0 + + +class DynamicHintExtractor(ABC): + name: str + + @abstractmethod + def extract(self, repo_root: Path) -> list[DynamicEdge]: ... diff --git a/packages/core/src/repowise/core/ingestion/dynamic_hints/django.py b/packages/core/src/repowise/core/ingestion/dynamic_hints/django.py new file mode 100644 index 0000000..6d1e4a4 --- /dev/null +++ b/packages/core/src/repowise/core/ingestion/dynamic_hints/django.py @@ -0,0 +1,152 @@ +from __future__ import annotations + +import ast +import re +from pathlib import Path + +from .base import DynamicEdge, DynamicHintExtractor + + +def _app_to_path(app: str, repo_root: Path) -> str | None: + """Attempt to resolve a dotted app name to an __init__.py under repo_root.""" + # Try direct directory: myapp/__init__.py + direct = repo_root / app / "__init__.py" + if direct.exists(): + return str(direct.relative_to(repo_root).as_posix()) + # Try dotted path: myapp.sub → myapp/sub/__init__.py + dotted = app.replace(".", "/") + "/__init__.py" + dotted_path = repo_root / dotted + if dotted_path.exists(): + return str(dotted_path.relative_to(repo_root).as_posix()) + return None + + +def _module_to_path(module: str, repo_root: Path) -> str | None: + """Attempt to resolve a dotted module string to a .py file under repo_root.""" + as_path = module.replace(".", "/") + # Try as a .py file directly + candidate = repo_root / (as_path + ".py") + if candidate.exists(): + return str(candidate.relative_to(repo_root).as_posix()) + # Try as __init__.py inside a package + candidate = repo_root / as_path / "__init__.py" + if candidate.exists(): + return str(candidate.relative_to(repo_root).as_posix()) + return None + + +def _extract_string_list(node: ast.expr) -> list[str]: + """Extract string literals from an ast.List node.""" + results: list[str] = [] + if not isinstance(node, ast.List): + return results + for elt in node.elts: + if isinstance(elt, ast.Constant) and isinstance(elt.value, str): + results.append(elt.value) + return results + + +def _extract_string_value(node: ast.expr) -> str | None: + """Extract a string literal value from a node.""" + if isinstance(node, ast.Constant) and isinstance(node.value, str): + return node.value + return None + + +class DjangoDynamicHints(DynamicHintExtractor): + name = "django_settings" + + def extract(self, repo_root: Path) -> list[DynamicEdge]: + edges: list[DynamicEdge] = [] + edges.extend(self._scan_settings(repo_root)) + edges.extend(self._scan_urls(repo_root)) + return edges + + def _scan_settings(self, repo_root: Path) -> list[DynamicEdge]: + edges: list[DynamicEdge] = [] + + # Collect all settings files + settings_files: list[Path] = list(repo_root.rglob("settings.py")) + for settings_dir in repo_root.rglob("settings"): + if settings_dir.is_dir(): + settings_files.extend(settings_dir.glob("*.py")) + + for settings_file in settings_files: + try: + source = settings_file.read_text(encoding="utf-8", errors="ignore") + tree = ast.parse(source, filename=str(settings_file)) + except Exception: + continue + + try: + rel_settings = settings_file.relative_to(repo_root).as_posix() + except ValueError: + continue + + for node in ast.walk(tree): + if not isinstance(node, ast.Assign): + continue + for target in node.targets: + if not (isinstance(target, ast.Name)): + continue + name = target.id + + if name == "INSTALLED_APPS": + for app in _extract_string_list(node.value): + resolved = _app_to_path(app, repo_root) + if resolved: + edges.append(DynamicEdge( + source=rel_settings, + target=resolved, + edge_type="dynamic_imports", + hint_source=self.name, + )) + + elif name == "ROOT_URLCONF": + module = _extract_string_value(node.value) + if module: + resolved = _module_to_path(module, repo_root) + if resolved: + edges.append(DynamicEdge( + source=rel_settings, + target=resolved, + edge_type="dynamic_imports", + hint_source=self.name, + )) + + elif name == "MIDDLEWARE": + for middleware in _extract_string_list(node.value): + resolved = _module_to_path(middleware, repo_root) + if resolved: + edges.append(DynamicEdge( + source=rel_settings, + target=resolved, + edge_type="dynamic_imports", + hint_source=self.name, + )) + + return edges + + def _scan_urls(self, repo_root: Path) -> list[DynamicEdge]: + edges: list[DynamicEdge] = [] + include_re = re.compile(r"""include\(\s*['\"]([\w\.]+)['\"]""") + + for urls_file in repo_root.rglob("urls.py"): + try: + source = urls_file.read_text(encoding="utf-8", errors="ignore") + rel_urls = urls_file.relative_to(repo_root).as_posix() + except Exception: + continue + + for match in include_re.finditer(source): + module = match.group(1) + resolved = _module_to_path(module, repo_root) + if resolved: + edges.append(DynamicEdge( + source=rel_urls, + target=resolved, + edge_type="url_route", + hint_source=self.name, + )) + + return edges diff --git a/packages/core/src/repowise/core/ingestion/dynamic_hints/node.py b/packages/core/src/repowise/core/ingestion/dynamic_hints/node.py new file mode 100644 index 0000000..62e6e11 --- /dev/null +++ b/packages/core/src/repowise/core/ingestion/dynamic_hints/node.py @@ -0,0 +1,136 @@ +from __future__ import annotations + +import json +import re +from pathlib import Path +from typing import Any + +from .base import DynamicEdge, DynamicHintExtractor + + +def _json_loads_lenient(text: str) -> Any: + """Try json.loads; on failure, strip trailing commas and retry.""" + try: + return json.loads(text) + except json.JSONDecodeError: + cleaned = re.sub(r",\s*([}\]])", r"\1", text) + return json.loads(cleaned) + + +def _collect_export_strings(obj: Any) -> list[str]: + """Recursively collect string values from an exports object.""" + results: list[str] = [] + if isinstance(obj, str): + results.append(obj) + elif isinstance(obj, dict): + for v in obj.values(): + results.extend(_collect_export_strings(v)) + elif isinstance(obj, list): + for item in obj: + results.extend(_collect_export_strings(item)) + return results + + +class NodeDynamicHints(DynamicHintExtractor): + name = "node_package" + + def extract(self, repo_root: Path) -> list[DynamicEdge]: + edges: list[DynamicEdge] = [] + edges.extend(self._scan_package_json(repo_root)) + edges.extend(self._scan_tsconfig(repo_root)) + return edges + + def _scan_package_json(self, repo_root: Path) -> list[DynamicEdge]: + edges: list[DynamicEdge] = [] + + for pkg_file in repo_root.rglob("package.json"): + # Skip node_modules + if "node_modules" in pkg_file.parts: + continue + try: + text = pkg_file.read_text(encoding="utf-8", errors="ignore") + data = json.loads(text) + rel_pkg = pkg_file.relative_to(repo_root).as_posix() + pkg_dir = pkg_file.parent + except Exception: + continue + + # Collect entry point fields and exports strings + candidates: list[str] = [] + for field in ("main", "module", "browser"): + val = data.get(field) + if isinstance(val, str): + candidates.append(val) + + exports = data.get("exports") + if exports is not None: + candidates.extend(_collect_export_strings(exports)) + + for candidate in candidates: + if not candidate.startswith("."): + # Only resolve relative paths + continue + resolved = (pkg_dir / candidate).resolve() + try: + rel_resolved = resolved.relative_to(repo_root.resolve()).as_posix() + except ValueError: + continue + if resolved.exists(): + edges.append(DynamicEdge( + source=rel_pkg, + target=rel_resolved, + edge_type="dynamic_imports", + hint_source=self.name, + )) + + return edges + + def _scan_tsconfig(self, repo_root: Path) -> list[DynamicEdge]: + edges: list[DynamicEdge] = [] + + for tsconfig in repo_root.rglob("tsconfig*.json"): + if "node_modules" in tsconfig.parts: + continue + try: + text = tsconfig.read_text(encoding="utf-8", errors="ignore") + data = _json_loads_lenient(text) + rel_tsconfig = tsconfig.relative_to(repo_root).as_posix() + tsconfig_dir = tsconfig.parent + except Exception: + continue + + compiler_options = data.get("compilerOptions", {}) + if not isinstance(compiler_options, dict): + continue + + paths = compiler_options.get("paths") + if not isinstance(paths, dict): + continue + + base_url = compiler_options.get("baseUrl", ".") + base_dir = (tsconfig_dir / base_url).resolve() + + for _alias, targets in paths.items(): + if not isinstance(targets, list): + continue + for pattern in targets: + if not isinstance(pattern, str): + continue + # Drop trailing /* from glob patterns + clean = pattern.rstrip("/*").rstrip("/") + if not clean: + continue + resolved = (base_dir / clean).resolve() + try: + rel_resolved = resolved.relative_to(repo_root.resolve()).as_posix() + except ValueError: + continue + if resolved.exists(): + edges.append(DynamicEdge( + source=rel_tsconfig, + target=rel_resolved, + edge_type="dynamic_imports", + hint_source=self.name, + )) + + return edges diff --git a/packages/core/src/repowise/core/ingestion/dynamic_hints/pytest_hints.py b/packages/core/src/repowise/core/ingestion/dynamic_hints/pytest_hints.py new file mode 100644 index 0000000..4585fcd --- /dev/null +++ b/packages/core/src/repowise/core/ingestion/dynamic_hints/pytest_hints.py @@ -0,0 +1,103 @@ +from __future__ import annotations + +import ast +from pathlib import Path + +from .base import DynamicEdge, DynamicHintExtractor + + +def _get_fixture_names(tree: ast.AST) -> set[str]: + """Walk an AST and return the names of all @pytest.fixture / @fixture decorated functions.""" + names: set[str] = [] + for node in ast.walk(tree): + if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): + continue + for decorator in node.decorator_list: + if _is_fixture_decorator(decorator): + names.append(node.name) + break + return set(names) + + +def _is_fixture_decorator(decorator: ast.expr) -> bool: + """Return True if the decorator is @fixture or @pytest.fixture (with or without call).""" + # @fixture or @pytest.fixture + if isinstance(decorator, ast.Name) and decorator.id == "fixture": + return True + if isinstance(decorator, ast.Attribute) and decorator.attr == "fixture": + return True + # @fixture(...) or @pytest.fixture(...) + if isinstance(decorator, ast.Call): + return _is_fixture_decorator(decorator.func) + return False + + +def _get_test_function_params(tree: ast.AST) -> set[str]: + """Return all parameter names of test_* functions in the AST.""" + params: set[str] = set() + for node in ast.walk(tree): + if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): + continue + if not node.name.startswith("test_"): + continue + for arg in node.args.args + node.args.posonlyargs + node.args.kwonlyargs: + params.add(arg.arg) + if node.args.vararg: + params.add(node.args.vararg.arg) + if node.args.kwarg: + params.add(node.args.kwarg.arg) + return params + + +class PytestDynamicHints(DynamicHintExtractor): + name = "pytest_conftest" + + def extract(self, repo_root: Path) -> list[DynamicEdge]: + edges: list[DynamicEdge] = [] + + for conftest in repo_root.rglob("conftest.py"): + try: + source = conftest.read_text(encoding="utf-8", errors="ignore") + tree = ast.parse(source, filename=str(conftest)) + rel_conftest = conftest.relative_to(repo_root).as_posix() + except Exception: + continue + + fixture_names = _get_fixture_names(tree) + if not fixture_names: + continue + + conftest_dir = conftest.parent + seen_targets: set[str] = set() + + # Find all test files under the conftest's parent directory + for pattern in ("test_*.py", "*_test.py"): + for test_file in conftest_dir.rglob(pattern): + if test_file == conftest: + continue + try: + rel_test = test_file.relative_to(repo_root).as_posix() + except ValueError: + continue + + if rel_test in seen_targets: + continue + + try: + test_source = test_file.read_text(encoding="utf-8", errors="ignore") + test_tree = ast.parse(test_source, filename=str(test_file)) + except Exception: + continue + + # Check if any test function uses a fixture from this conftest + test_params = _get_test_function_params(test_tree) + if test_params & fixture_names: + seen_targets.add(rel_test) + edges.append(DynamicEdge( + source=rel_conftest, + target=rel_test, + edge_type="dynamic_uses", + hint_source=self.name, + )) + + return edges diff --git a/packages/core/src/repowise/core/ingestion/dynamic_hints/registry.py b/packages/core/src/repowise/core/ingestion/dynamic_hints/registry.py new file mode 100644 index 0000000..16b3750 --- /dev/null +++ b/packages/core/src/repowise/core/ingestion/dynamic_hints/registry.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from pathlib import Path + +import structlog + +from .base import DynamicEdge, DynamicHintExtractor +from .django import DjangoDynamicHints +from .pytest_hints import PytestDynamicHints +from .node import NodeDynamicHints + +log = structlog.get_logger(__name__) + + +class HintRegistry: + def __init__(self, extractors: list[DynamicHintExtractor] | None = None) -> None: + self._extractors = extractors or [ + DjangoDynamicHints(), + PytestDynamicHints(), + NodeDynamicHints(), + ] + + def extract_all(self, repo_root: Path) -> list[DynamicEdge]: + edges: list[DynamicEdge] = [] + for ex in self._extractors: + try: + got = ex.extract(repo_root) + edges.extend(got) + log.debug("dynamic_hints", extractor=ex.name, count=len(got)) + except Exception as e: + log.warning("dynamic_hints_failed", extractor=ex.name, error=str(e)) + return edges diff --git a/packages/core/src/repowise/core/ingestion/git_indexer.py b/packages/core/src/repowise/core/ingestion/git_indexer.py index db4a643..b1875cb 100644 --- a/packages/core/src/repowise/core/ingestion/git_indexer.py +++ b/packages/core/src/repowise/core/ingestion/git_indexer.py @@ -100,6 +100,9 @@ def _quiet_del(self: Any) -> None: # Co-change temporal decay: half-life ~125 days (lambda for exp(-t/tau)). _CO_CHANGE_DECAY_TAU: float = 180.0 +# Hotspot temporal decay: half-life for exponentially weighted churn score. +HOTSPOT_HALFLIFE_DAYS: float = 180.0 + # Regex to extract PR/MR numbers from commit messages. # Matches: "#123", "Merge pull request #456", "(#789)", "!42" (GitLab MR) _PR_NUMBER_RE = re.compile(r"(?:pull request |)\#(\d+)|\(#(\d+)\)|!(\d+)") @@ -555,6 +558,8 @@ def _index_file(self, file_path: str, repo: Any) -> dict: # Phase 3 fields "original_path": None, "merge_commit_count_90d": 0, + # Temporal hotspot score (exponentially decayed churn) + "temporal_hotspot_score": 0.0, } try: @@ -672,6 +677,18 @@ def _index_file(self, file_path: str, repo: Any) -> dict: total_churn = meta["lines_added_90d"] + meta["lines_deleted_90d"] meta["avg_commit_size"] = total_churn / c90 if c90 > 0 else 0.0 + # Temporal hotspot score: exponentially decayed per-commit churn. + # Each commit contributes weight * clamped_lines where weight decays + # with a half-life of HOTSPOT_HALFLIFE_DAYS days from now. + _ln2 = math.log(2) + temporal_score = 0.0 + for c in commits: + age_days = max((now.timestamp() - c.ts) / 86400.0, 0.0) + weight = math.exp(-_ln2 * age_days / HOTSPOT_HALFLIFE_DAYS) + lines = min((c.added + c.deleted) / 100.0, 3.0) + temporal_score += weight * lines + meta["temporal_hotspot_score"] = temporal_score + # Contributor count & bus factor meta["contributor_count"] = len(author_counts) total_commits = sum(author_counts.values()) @@ -964,14 +981,21 @@ def _flush_commit() -> None: @staticmethod def _compute_percentiles(metadata_list: list[dict]) -> None: - """Compute churn_percentile and is_hotspot. Mutates in place.""" + """Compute churn_percentile and is_hotspot. Mutates in place. + + Primary sort key is temporal_hotspot_score (exponentially decayed churn); + commit_count_90d is used as a tiebreak, matching the SQL PERCENT_RANK path. + """ if not metadata_list: return - # Sort by commit_count_90d for percentile ranking + # Sort by temporal_hotspot_score (primary) then commit_count_90d (tiebreak) sorted_by_churn = sorted( range(len(metadata_list)), - key=lambda i: metadata_list[i].get("commit_count_90d", 0), + key=lambda i: ( + metadata_list[i].get("temporal_hotspot_score") or 0.0, + metadata_list[i].get("commit_count_90d", 0), + ), ) total = len(metadata_list) diff --git a/packages/core/src/repowise/core/ingestion/graph.py b/packages/core/src/repowise/core/ingestion/graph.py index a003d64..d9d21df 100644 --- a/packages/core/src/repowise/core/ingestion/graph.py +++ b/packages/core/src/repowise/core/ingestion/graph.py @@ -500,6 +500,26 @@ def update_co_change_edges(self, updated_meta: dict, min_count: int = 3) -> None # Re-add co_changes edges self.add_co_change_edges(updated_meta, min_count) + # ------------------------------------------------------------------ + # Dynamic-hint edges + # ------------------------------------------------------------------ + + def add_dynamic_edges(self, edges: list) -> None: + """Add dynamic-hint edges to the graph. Each edge is a DynamicEdge.""" + for e in edges: + if e.source not in self._graph: + continue + if e.target not in self._graph: + # add a stub node so dead-code analysis sees it as reachable + self._graph.add_node(e.target) + self._graph.add_edge( + e.source, + e.target, + edge_type="dynamic", + hint_source=e.hint_source, + weight=e.weight, + ) + # ------------------------------------------------------------------ # Framework-aware synthetic edges # ------------------------------------------------------------------ diff --git a/packages/core/src/repowise/core/persistence/coordinator.py b/packages/core/src/repowise/core/persistence/coordinator.py new file mode 100644 index 0000000..dfbd1ce --- /dev/null +++ b/packages/core/src/repowise/core/persistence/coordinator.py @@ -0,0 +1,223 @@ +"""Atomic three-store transaction coordinator. + +Buffers writes across SQL (AsyncSession), in-memory graph (GraphBuilder / +NetworkX), and the vector store. Flushes them in order; rolls back on failure. + +Usage: + coord = AtomicStorageCoordinator(session, graph_builder, vector_store) + async with coord.transaction() as txn: + txn.add_sql(some_orm_obj) + txn.add_graph_node("path/file.py", attrs={...}) + txn.add_graph_edge("a.py", "b.py", attrs={...}) + txn.add_vector("page-id", {"path": ..., "summary": ..., "embedding": ...}) + # On normal exit: SQL commit, graph applied, vectors upserted. + # On exception anywhere: SQL rollback, graph nodes/edges removed, vector ids deleted. + +Vector store notes +------------------ +All three stores (InMemoryVectorStore, LanceDBVectorStore, PgVectorStore) share +the same async API: + - upsert: embed_and_upsert(page_id: str, text: str, metadata: dict) -> None + - delete: delete(page_id: str) -> None + - count: __len__() (InMemoryVectorStore only; others unsupported) + +The ``record`` dict passed to ``add_vector`` must contain a ``"text"`` key +(the raw text to embed). All other keys are forwarded as metadata. + +GraphBuilder notes +------------------ +The NetworkX DiGraph is stored as ``GraphBuilder._graph`` (private attribute). +This coordinator accesses it directly via ``getattr(builder, "_graph", None)`` +to avoid triggering a full ``build()`` call. +""" +from __future__ import annotations + +import asyncio +from contextlib import asynccontextmanager +from dataclasses import dataclass, field +from typing import Any, AsyncIterator +import structlog + +log = structlog.get_logger(__name__) + + +@dataclass +class _PendingTransaction: + pending_sql_objects: list[Any] = field(default_factory=list) + pending_graph_nodes: list[tuple[str, dict]] = field(default_factory=list) + pending_graph_edges: list[tuple[str, str, dict]] = field(default_factory=list) + pending_vectors: list[tuple[str, dict]] = field(default_factory=list) # (id, record) + + def add_sql(self, obj: Any) -> None: + self.pending_sql_objects.append(obj) + + def add_graph_node(self, node_id: str, attrs: dict | None = None) -> None: + self.pending_graph_nodes.append((node_id, attrs or {})) + + def add_graph_edge(self, src: str, dst: str, attrs: dict | None = None) -> None: + self.pending_graph_edges.append((src, dst, attrs or {})) + + def add_vector(self, vector_id: str, record: dict) -> None: + """Queue a vector upsert. + + ``record`` must contain a ``"text"`` key with the raw text to embed. + All remaining keys are passed as metadata to the vector store. + """ + self.pending_vectors.append((vector_id, record)) + + +class AtomicStorageCoordinator: + def __init__(self, session, graph_builder=None, vector_store=None) -> None: + self._session = session + self._graph_builder = graph_builder + self._vector_store = vector_store + self._lock = asyncio.Lock() + + @asynccontextmanager + async def transaction(self) -> AsyncIterator[_PendingTransaction]: + txn = _PendingTransaction() + applied_nodes: list[str] = [] + applied_edges: list[tuple[str, str]] = [] + applied_vector_ids: list[str] = [] + async with self._lock: + try: + yield txn + # === FLUSH === + # 1. SQL first (most likely to fail constraints) + for obj in txn.pending_sql_objects: + self._session.add(obj) + await self._session.flush() + + # 2. Graph (in-memory; track for rollback) + # Access _graph directly to avoid triggering a full build() call. + if self._graph_builder is not None: + g = getattr(self._graph_builder, "_graph", None) + if g is not None: + for node_id, attrs in txn.pending_graph_nodes: + existed = node_id in g + g.add_node(node_id, **attrs) + if not existed: + applied_nodes.append(node_id) + for src, dst, attrs in txn.pending_graph_edges: + if not g.has_edge(src, dst): + g.add_edge(src, dst, **attrs) + applied_edges.append((src, dst)) + + # 3. Vector store last + # All stores expose: embed_and_upsert(page_id, text, metadata) async + if self._vector_store is not None and txn.pending_vectors: + for vid, record in txn.pending_vectors: + await _vector_upsert(self._vector_store, vid, record) + applied_vector_ids.append(vid) + + await self._session.commit() + log.debug( + "atomic_txn_committed", + sql=len(txn.pending_sql_objects), + nodes=len(applied_nodes), + edges=len(applied_edges), + vectors=len(applied_vector_ids), + ) + except Exception as e: + log.warning("atomic_txn_failed_rollback", error=str(e)) + # SQL rollback + try: + await self._session.rollback() + except Exception as e2: + log.error("sql_rollback_failed", error=str(e2)) + # Graph rollback + if self._graph_builder is not None: + g = getattr(self._graph_builder, "_graph", None) + if g is not None: + for src, dst in applied_edges: + if g.has_edge(src, dst): + g.remove_edge(src, dst) + for node_id in applied_nodes: + if node_id in g: + g.remove_node(node_id) + # Vector rollback — all stores expose delete(page_id) async + if self._vector_store is not None: + for vid in applied_vector_ids: + try: + await _vector_delete(self._vector_store, vid) + except Exception as e2: + log.error("vector_rollback_failed", id=vid, error=str(e2)) + raise + + async def health_check(self) -> dict: + """Compare counts across stores. Returns drift report.""" + from sqlalchemy import text + report: dict = {"sql_pages": None, "vector_count": None, "graph_nodes": None, "drift": None} + try: + res = await self._session.execute(text("SELECT COUNT(*) FROM wiki_pages")) + report["sql_pages"] = int(res.scalar() or 0) + except Exception as e: + report["sql_pages_error"] = str(e) + if self._graph_builder is not None: + g = getattr(self._graph_builder, "_graph", None) + if g is not None: + report["graph_nodes"] = g.number_of_nodes() + if report["graph_nodes"] is None: + try: + res = await self._session.execute(text("SELECT COUNT(*) FROM graph_nodes")) + report["graph_nodes"] = int(res.scalar() or 0) + except Exception as e: + report["graph_nodes_error"] = str(e) + if self._vector_store is not None: + try: + report["vector_count"] = await _vector_count(self._vector_store) + except Exception as e: + report["vector_count_error"] = str(e) + # Compute drift between SQL and vector if both available + if report["sql_pages"] is not None and report["vector_count"] is not None: + denom = max(report["sql_pages"], 1) + report["drift"] = abs(report["sql_pages"] - report["vector_count"]) / denom + return report + + +# --------------------------------------------------------------------------- +# Vector store adapter helpers +# +# All three stores (InMemoryVectorStore, LanceDBVectorStore, PgVectorStore) +# share the same method names: +# upsert: embed_and_upsert(page_id: str, text: str, metadata: dict) -> None (async) +# delete: delete(page_id: str) -> None (async) +# count: __len__() (sync, InMemoryVectorStore only; others return -1) +# --------------------------------------------------------------------------- + +async def _vector_upsert(store, vid: str, record: dict) -> None: + """Call embed_and_upsert on the store. + + ``record`` must contain a ``"text"`` key. All other keys are forwarded + as metadata. Raises ValueError if ``"text"`` is absent. + """ + text = record.get("text") + if text is None: + raise ValueError( + f"_vector_upsert: record for '{vid}' is missing required 'text' key. " + f"Keys present: {list(record.keys())}" + ) + metadata = {k: v for k, v in record.items() if k != "text"} + await store.embed_and_upsert(vid, text, metadata) + + +async def _vector_delete(store, vid: str) -> None: + """Call delete(page_id) on the store.""" + await store.delete(vid) + + +async def _vector_count(store) -> int: + """Return the number of vectors in the store. + + InMemoryVectorStore exposes __len__; LanceDB and PgVector are counted by + listing page IDs (cheap on small/medium repos and avoids backend-specific SQL). + Returns -1 if no count method is available. + """ + fn = getattr(store, "__len__", None) + if fn is not None: + return int(fn()) + list_ids = getattr(store, "list_page_ids", None) + if list_ids is not None: + ids = await list_ids() + return len(ids) + return -1 diff --git a/packages/core/src/repowise/core/persistence/crud.py b/packages/core/src/repowise/core/persistence/crud.py index 00dcf64..577abb8 100644 --- a/packages/core/src/repowise/core/persistence/crud.py +++ b/packages/core/src/repowise/core/persistence/crud.py @@ -18,7 +18,7 @@ from datetime import UTC, datetime from typing import Any -from sqlalchemy import select +from sqlalchemy import select, text from sqlalchemy.ext.asyncio import AsyncSession from .models import ( @@ -709,31 +709,41 @@ async def recompute_git_percentiles( session: AsyncSession, repository_id: str, ) -> int: - """Reload all git_metadata rows and recompute churn_percentile + is_hotspot. + """Recompute churn_percentile + is_hotspot using a SQL PERCENT_RANK window function. Called after incremental updates so that percentile rankings stay fresh without a full ``repowise init``. Returns the number of rows updated. + + Primary ranking signal is temporal_hotspot_score (exponentially decayed churn); + commit_count_90d is the tiebreak. Works on both SQLite (3.25+) and PostgreSQL. """ - result = await session.execute( + # First check how many rows exist so we can return the count without an + # extra query after the UPDATE. + count_result = await session.execute( select(GitMetadata).where(GitMetadata.repository_id == repository_id) ) - rows = result.scalars().all() + rows = count_result.scalars().all() if not rows: return 0 - # Sort by 90-day commit count for percentile ranking - sorted_rows = sorted(rows, key=lambda r: r.commit_count_90d or 0) - total = len(sorted_rows) - - for rank, row in enumerate(sorted_rows): - pct = rank / total if total > 0 else 0.0 - row.churn_percentile = pct - c90 = row.commit_count_90d or 0 - row.is_hotspot = pct >= 0.75 and c90 > 0 - row.updated_at = _now_utc() - + sql = """ +WITH ranked AS ( + SELECT id, PERCENT_RANK() OVER ( + PARTITION BY repository_id + ORDER BY COALESCE(temporal_hotspot_score, 0.0), commit_count_90d + ) AS prank + FROM git_metadata + WHERE repository_id = :repo_id +) +UPDATE git_metadata +SET churn_percentile = (SELECT prank FROM ranked WHERE ranked.id = git_metadata.id), + is_hotspot = ((SELECT prank FROM ranked WHERE ranked.id = git_metadata.id) >= 0.75 + AND git_metadata.commit_count_90d > 0) +WHERE repository_id = :repo_id; +""" + await session.execute(text(sql), {"repo_id": repository_id}) await session.flush() - return total + return len(rows) # --------------------------------------------------------------------------- diff --git a/packages/core/src/repowise/core/persistence/models.py b/packages/core/src/repowise/core/persistence/models.py index 3b28500..1ab2229 100644 --- a/packages/core/src/repowise/core/persistence/models.py +++ b/packages/core/src/repowise/core/persistence/models.py @@ -299,6 +299,9 @@ class GitMetadata(Base): original_path: Mapped[str | None] = mapped_column(Text, nullable=True) merge_commit_count_90d: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + # Temporal hotspot score: exponentially time-decayed churn signal + temporal_hotspot_score: Mapped[float | None] = mapped_column(Float, nullable=True, default=0.0) + created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), nullable=False, default=_now_utc ) @@ -401,6 +404,45 @@ class ChatMessage(Base): ) +class LlmCost(Base): + """A single LLM API call cost record.""" + + __tablename__ = "llm_costs" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + repository_id: Mapped[str] = mapped_column( + String(32), ForeignKey("repositories.id", ondelete="CASCADE"), nullable=False + ) + ts: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, default=_now_utc + ) + model: Mapped[str] = mapped_column(String(100), nullable=False) + operation: Mapped[str] = mapped_column(String(50), nullable=False) + input_tokens: Mapped[int] = mapped_column(Integer, nullable=False) + output_tokens: Mapped[int] = mapped_column(Integer, nullable=False) + cost_usd: Mapped[float] = mapped_column(Float, nullable=False) + file_path: Mapped[str | None] = mapped_column(String(1024), nullable=True) + + +class SecurityFinding(Base): + """A security signal detected during file ingestion.""" + + __tablename__ = "security_findings" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + repository_id: Mapped[str] = mapped_column( + String(32), ForeignKey("repositories.id", ondelete="CASCADE"), nullable=False + ) + file_path: Mapped[str] = mapped_column(String(1024), nullable=False) + kind: Mapped[str] = mapped_column(String(100), nullable=False) + severity: Mapped[str] = mapped_column(String(20), nullable=False) + snippet: Mapped[str | None] = mapped_column(Text, nullable=True) + line_number: Mapped[int | None] = mapped_column(Integer, nullable=True) + detected_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), nullable=False, default=_now_utc + ) + + class DeadCodeFinding(Base): """Dead code finding: unreachable files, unused exports, zombie packages.""" diff --git a/packages/core/src/repowise/core/persistence/vector_store.py b/packages/core/src/repowise/core/persistence/vector_store.py index dce05c0..14e431b 100644 --- a/packages/core/src/repowise/core/persistence/vector_store.py +++ b/packages/core/src/repowise/core/persistence/vector_store.py @@ -76,6 +76,14 @@ async def list_page_ids(self) -> set[str]: """ return set() # default: empty (subclasses should override) + async def get_page_summary_by_path(self, path: str) -> dict | None: + """Return {'summary': str, 'key_exports': list[str]} for a previously-indexed page, or None. + + Used for RAG context injection during doc generation: when generating page B + that imports A, we fetch A's previously-generated summary and feed it to the LLM. + """ + return None # default: no-op (subclasses should override) + # --------------------------------------------------------------------------- # InMemoryVectorStore @@ -145,6 +153,23 @@ async def close(self) -> None: async def list_page_ids(self) -> set[str]: return set(self._store.keys()) + async def get_page_summary_by_path(self, path: str) -> dict | None: + """Return {'summary': str, 'key_exports': list[str]} for a previously-indexed page, or None. + + Used for RAG context injection during doc generation: when generating page B + that imports A, we fetch A's previously-generated summary and feed it to the LLM. + + Implementation note: reads 'summary' from metadata if present (set by the + generation pipeline), else falls back to the first 500 chars of 'content'. + 'key_exports' reads the 'exports' metadata field if present, else []. + """ + for _pid, (_, meta) in self._store.items(): + if meta.get("target_path") == path: + summary = meta.get("summary") or str(meta.get("content", ""))[:500] + key_exports = meta.get("exports") or [] + return {"summary": summary, "key_exports": list(key_exports)} + return None + def __len__(self) -> int: return len(self._store) @@ -294,6 +319,38 @@ async def list_page_ids(self) -> set[str]: rows = await self._table.query().select(["page_id"]).to_list() # type: ignore[union-attr] return {r["page_id"] for r in rows} + async def get_page_summary_by_path(self, path: str) -> dict | None: + """Return {'summary': str, 'key_exports': list[str]} for a previously-indexed page, or None. + + Used for RAG context injection during doc generation: when generating page B + that imports A, we fetch A's previously-generated summary and feed it to the LLM. + + Implementation note: LanceDB stores up to 200 chars of content in 'content_snippet'. + We use that as the summary. 'key_exports' is not stored in the LanceDB schema, so + we return [] — the caller only uses the text summary for prompt injection. + """ + await self._ensure_connected() + if self._table is None: + return None + + safe_path = path.replace("'", "''") + try: + rows = ( + await self._table.query() # type: ignore[union-attr] + .where(f"target_path = '{safe_path}'") + .select(["content_snippet"]) + .limit(1) + .to_list() + ) + except Exception: + return None + + if not rows: + return None + + summary = rows[0].get("content_snippet") or "" + return {"summary": str(summary), "key_exports": []} + # --------------------------------------------------------------------------- # PgVectorStore @@ -390,3 +447,44 @@ async def list_page_ids(self) -> set[str]: sa_text("SELECT id FROM wiki_pages WHERE embedding IS NOT NULL") ) return {r[0] for r in rows.fetchall()} + + async def get_page_summary_by_path(self, path: str) -> dict | None: + """Return {'summary': str, 'key_exports': list[str]} for a previously-indexed page, or None. + + Used for RAG context injection during doc generation: when generating page B + that imports A, we fetch A's previously-generated summary and feed it to the LLM. + + Implementation note: reads the 'content' column (first 500 chars) from the + wiki_pages table matched by target_path. 'key_exports' is derived from the + page's exports if stored in a metadata JSON column; otherwise returns []. + """ + from sqlalchemy.sql import text as sa_text + + async with self._session_factory() as session: + rows = await session.execute( + sa_text( + "SELECT content, metadata FROM wiki_pages " + "WHERE target_path = :path " + "LIMIT 1" + ), + {"path": path}, + ) + row = rows.fetchone() + + if row is None: + return None + + content = str(row[0] or "")[:500] + # Extract key_exports from metadata JSON column if present + key_exports: list[str] = [] + if row[1] and isinstance(row[1], dict): + key_exports = list(row[1].get("exports", [])) + elif row[1] and isinstance(row[1], str): + import json + try: + meta = json.loads(row[1]) + key_exports = list(meta.get("exports", [])) + except (json.JSONDecodeError, AttributeError): + pass + + return {"summary": content, "key_exports": key_exports} diff --git a/packages/core/src/repowise/core/pipeline/orchestrator.py b/packages/core/src/repowise/core/pipeline/orchestrator.py index dfae3d1..a7c390c 100644 --- a/packages/core/src/repowise/core/pipeline/orchestrator.py +++ b/packages/core/src/repowise/core/pipeline/orchestrator.py @@ -15,8 +15,9 @@ from __future__ import annotations import asyncio +import os import time -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor from dataclasses import dataclass, field from pathlib import Path from typing import Any @@ -27,6 +28,35 @@ logger = structlog.get_logger(__name__) + +# --------------------------------------------------------------------------- +# Process-pool worker (module-level — must be picklable) +# --------------------------------------------------------------------------- + +# Module-level process-local parser cache (one per worker process). +_WORKER_PARSER: Any = None + + +def _parse_one(path_and_fi_and_bytes: tuple) -> Any: + """Worker function for ProcessPoolExecutor parsing. + + Constructs (or reuses) a process-local ASTParser and parses one file. + Returns a ParsedFile on success, or (abs_path_str, error_str) on failure. + The parser is constructed lazily inside the worker — the ASTParser itself + (which holds compiled tree-sitter Language/Query objects) is never pickled. + Only FileInfo (input) and ParsedFile (output) cross the process boundary; + both are plain dataclasses and therefore picklable. + """ + global _WORKER_PARSER + fi, source = path_and_fi_and_bytes + try: + if _WORKER_PARSER is None: + from repowise.core.ingestion import ASTParser + _WORKER_PARSER = ASTParser() + return _WORKER_PARSER.parse_file(fi, source) + except Exception as exc: + return (fi.abs_path, str(exc)) + # Maximum seconds to spend on decision extraction before giving up. # Large repos with tens of thousands of files can take arbitrarily long. DECISION_EXTRACTION_TIMEOUT_SECS = 300 @@ -111,6 +141,7 @@ async def run_pipeline( test_run: bool = False, resume: bool = False, progress: ProgressCallback | None = None, + cost_tracker: Any | None = None, ) -> PipelineResult: """Run the repowise indexing/analysis/generation pipeline. @@ -156,28 +187,47 @@ async def run_pipeline( commit_depth = max(1, min(commit_depth, 5000)) + # Attach cost tracker to provider if supplied + if cost_tracker is not None and llm_client is not None and hasattr(llm_client, "_cost_tracker"): + llm_client._cost_tracker = cost_tracker + # ---- Phase 1: Ingestion ------------------------------------------------ if progress: progress.on_message("info", "Phase 1: Ingestion") - parsed_files, file_infos, repo_structure, source_map, graph_builder = ( - await _run_ingestion( + # Launch git indexing as a background task immediately — it is independent + # of parsing and graph-build, so the two stages can run concurrently. + # _run_ingestion does: traverse → ProcessPool parse → graph build → dynamic hints. + # _run_git_indexing does: git log → co-change accumulation (I/O bound, own executor). + + async def _git_stage() -> tuple: + return await _run_git_indexing( + repo_path, + commit_depth=commit_depth, + follow_renames=follow_renames, + progress=progress, + ) + + async def _ingestion_stage() -> tuple: + return await _run_ingestion( repo_path, exclude_patterns=exclude_patterns, skip_tests=skip_tests, skip_infra=skip_infra, progress=progress, ) - ) - # Git indexing (runs concurrently with ingestion in the CLI, but here - # we start it after traversal since we're already async) - git_summary, git_metadata_list, git_meta_map = await _run_git_indexing( - repo_path, - commit_depth=commit_depth, - follow_renames=follow_renames, - progress=progress, - ) + ( + parsed_files, + file_infos, + repo_structure, + source_map, + graph_builder, + ), ( + git_summary, + git_metadata_list, + git_meta_map, + ) = await asyncio.gather(_ingestion_stage(), _git_stage()) # Add co-change edges to the graph if git_meta_map: @@ -335,31 +385,77 @@ async def _run_ingestion( if fi.language not in ("dockerfile", "makefile", "terraform", "shell") ] - # Parse (sequential — GraphBuilder is not thread-safe) + # ---- Parse phase: CPU-bound, run in ProcessPoolExecutor ---------------- if progress: progress.on_phase_start("parse", len(file_infos)) - parser = ASTParser() + # Read source bytes up front (I/O, sequential — fast enough; keeps worker + # args small: FileInfo + bytes, both picklable plain dataclasses/bytes). + fi_and_bytes: list[tuple] = [] + for fi in file_infos: + try: + source = Path(fi.abs_path).read_bytes() + fi_and_bytes.append((fi, source)) + except Exception: + if progress: + progress.on_item_done("parse") + parsed_files: list[Any] = [] source_map: dict[str, bytes] = {} graph_builder = GraphBuilder(repo_path=repo_path) - for i, fi in enumerate(file_infos): - try: - source = Path(fi.abs_path).read_bytes() - parsed = parser.parse_file(fi, source) - parsed_files.append(parsed) + loop = asyncio.get_running_loop() + workers = max(1, os.cpu_count() or 4) + + _use_process_pool = True + parse_results: list[Any] = [] + + try: + with ProcessPoolExecutor(max_workers=workers) as pool: + tasks = [ + loop.run_in_executor(pool, _parse_one, item) + for item in fi_and_bytes + ] + # Use as_completed via asyncio.as_completed to report per-file progress. + # We need to preserve (task → fi_and_bytes index) for source_map so we + # wrap tasks in a list and drain with gather instead. + parse_results = await asyncio.gather(*tasks, return_exceptions=True) + except Exception as pool_exc: + logger.warning( + "process_pool_parse_failed_falling_back", + error=str(pool_exc), + ) + _use_process_pool = False + # Fallback: in-process sequential parse + _fallback_parser = ASTParser() + for i, (fi, source) in enumerate(fi_and_bytes): + try: + result = _fallback_parser.parse_file(fi, source) + parse_results.append(result) + except Exception as exc: + parse_results.append((fi.abs_path, str(exc))) + if progress: + progress.on_item_done("parse") + if i % 50 == 49: + await asyncio.sleep(0) + + # Aggregate results into GraphBuilder on the main loop (not thread-safe). + for idx, result in enumerate(parse_results): + fi, source = fi_and_bytes[idx] + if isinstance(result, tuple) and len(result) == 2 and isinstance(result[1], str): + # Error tuple: (abs_path_str, error_str) + logger.debug("parse_error_in_worker", path=result[0], error=result[1]) + elif isinstance(result, Exception): + logger.debug("parse_exception_in_worker", path=fi.abs_path, error=str(result)) + else: + parsed_files.append(result) source_map[fi.path] = source - graph_builder.add_file(parsed) - except Exception: - pass # skip unparseable files - if progress: + graph_builder.add_file(result) + # Report per-file progress if we used the process pool (fallback already reported above). + if _use_process_pool and progress: progress.on_item_done("parse") - # Yield control every 50 files so the event loop stays responsive - if i % 50 == 49: - await asyncio.sleep(0) - # Build graph (CPU-bound — run in thread to keep event loop free) + # ---- Graph build phase ------------------------------------------------- if progress: progress.on_phase_start("graph", 1) await asyncio.to_thread(graph_builder.build) @@ -373,6 +469,17 @@ async def _run_ingestion( except Exception: pass # framework edge detection is best-effort + # ---- Dynamic hints wiring (after static graph is fully built) ---------- + try: + from repowise.core.ingestion.dynamic_hints import HintRegistry + + registry = HintRegistry() + dynamic_edges = await loop.run_in_executor(None, registry.extract_all, repo_path) + graph_builder.add_dynamic_edges(dynamic_edges) + logger.info("dynamic_hints_added", count=len(dynamic_edges)) + except Exception as hints_exc: + logger.warning("dynamic_hints_failed", error=str(hints_exc)) + if progress: progress.on_item_done("graph") @@ -533,6 +640,7 @@ async def run_generation( concurrency: int, progress: ProgressCallback | None, resume: bool = False, + cost_tracker: Any | None = None, ) -> list[Any]: """Run LLM-powered page generation. @@ -547,6 +655,10 @@ async def run_generation( from repowise.core.persistence.vector_store import InMemoryVectorStore from repowise.core.providers.embedding.base import MockEmbedder + # Attach cost tracker to LLM client if available + if cost_tracker is not None and llm_client is not None and hasattr(llm_client, "_cost_tracker"): + llm_client._cost_tracker = cost_tracker + config = GenerationConfig(max_concurrency=concurrency) assembler = ContextAssembler(config) @@ -573,6 +685,9 @@ def on_page_done(page_type: str) -> None: _pages_done += 1 if progress: progress.on_item_done("generation") + # Push live cost update if the callback supports it + if cost_tracker is not None and hasattr(progress, "set_cost"): + progress.set_cost(cost_tracker.session_cost) if progress: progress.on_phase_start("generation", None) diff --git a/packages/core/src/repowise/core/pipeline/persist.py b/packages/core/src/repowise/core/pipeline/persist.py index 78af265..8c702ac 100644 --- a/packages/core/src/repowise/core/pipeline/persist.py +++ b/packages/core/src/repowise/core/pipeline/persist.py @@ -108,6 +108,25 @@ async def persist_pipeline_result( if all_symbols: await batch_upsert_symbols(session, repo_id, all_symbols) + # ---- Security scan ------------------------------------------------------- + # Choice: persist.py (rather than orchestrator.py) because there is already + # a clear per-file loop over parsed_files here, and the instructions ask for + # a minimal, non-invasive addition. The orchestrator parse stage is owned + # by another agent and must not be touched. + try: + from repowise.core.analysis.security_scan import SecurityScanner + + scanner = SecurityScanner(session, repo_id) + for pf in result.parsed_files: + source_text = getattr(pf.file_info, "content", "") or "" + findings = await scanner.scan_file( + pf.file_info.path, source_text, pf.symbols + ) + if findings: + await scanner.persist(pf.file_info.path, findings) + except Exception as _sec_err: # noqa: BLE001 — scanner must never break the pipeline + logger.warning("security_scan_skipped", error=str(_sec_err)) + # ---- Git metadata -------------------------------------------------------- if result.git_metadata_list: await upsert_git_metadata_bulk(session, repo_id, result.git_metadata_list) diff --git a/packages/core/src/repowise/core/providers/llm/anthropic.py b/packages/core/src/repowise/core/providers/llm/anthropic.py index 8d9b0b2..1322100 100644 --- a/packages/core/src/repowise/core/providers/llm/anthropic.py +++ b/packages/core/src/repowise/core/providers/llm/anthropic.py @@ -36,9 +36,12 @@ RateLimitError, ) -from typing import Any, AsyncIterator +from typing import TYPE_CHECKING, Any, AsyncIterator from repowise.core.rate_limiter import RateLimiter +if TYPE_CHECKING: + from repowise.core.generation.cost_tracker import CostTracker + log = structlog.get_logger(__name__) _MAX_RETRIES = 3 @@ -61,6 +64,7 @@ def __init__( api_key: str | None = None, model: str = "claude-sonnet-4-6", rate_limiter: RateLimiter | None = None, + cost_tracker: CostTracker | None = None, ) -> None: resolved_key = api_key or os.environ.get("ANTHROPIC_API_KEY") if not resolved_key: @@ -71,6 +75,7 @@ def __init__( self._client = AsyncAnthropic(api_key=resolved_key) self._model = model self._rate_limiter = rate_limiter + self._cost_tracker = cost_tracker @property def provider_name(self) -> str: @@ -164,6 +169,23 @@ async def _generate_with_retry( cached_tokens=result.cached_tokens, request_id=request_id, ) + + if self._cost_tracker is not None: + import asyncio + + try: + asyncio.get_event_loop().create_task( + self._cost_tracker.record( + model=self._model, + input_tokens=result.input_tokens, + output_tokens=result.output_tokens, + operation="doc_generation", + file_path=None, + ) + ) + except RuntimeError: + pass # No running event loop — skip async record + return result # --- ChatProvider protocol implementation --- diff --git a/packages/core/src/repowise/core/providers/llm/gemini.py b/packages/core/src/repowise/core/providers/llm/gemini.py index 7398c7a..46503dc 100644 --- a/packages/core/src/repowise/core/providers/llm/gemini.py +++ b/packages/core/src/repowise/core/providers/llm/gemini.py @@ -36,9 +36,12 @@ RateLimitError, ) -from typing import Any, AsyncIterator +from typing import TYPE_CHECKING, Any, AsyncIterator from repowise.core.rate_limiter import RateLimiter +if TYPE_CHECKING: + from repowise.core.generation.cost_tracker import CostTracker + log = structlog.get_logger(__name__) _MAX_RETRIES = 3 @@ -53,6 +56,7 @@ class GeminiProvider(BaseProvider): model: Gemini model name. Defaults to gemini-3.1-flash-lite-preview. api_key: Google API key. Falls back to GEMINI_API_KEY or GOOGLE_API_KEY env var. rate_limiter: Optional RateLimiter instance. + cost_tracker: Optional CostTracker for recording token usage and cost. """ def __init__( @@ -60,6 +64,7 @@ def __init__( model: str = "gemini-3.1-flash-lite-preview", api_key: str | None = None, rate_limiter: RateLimiter | None = None, + cost_tracker: "CostTracker | None" = None, ) -> None: self._model = model self._api_key = ( @@ -73,6 +78,7 @@ def __init__( "No API key found. Pass api_key= or set GEMINI_API_KEY / GOOGLE_API_KEY env var.", ) self._rate_limiter = rate_limiter + self._cost_tracker = cost_tracker self._client: object | None = None # cached; created once on first call @property @@ -185,6 +191,21 @@ def _call_sync() -> GeneratedResponse: output_tokens=result.output_tokens, request_id=request_id, ) + + if self._cost_tracker is not None: + try: + asyncio.get_event_loop().create_task( + self._cost_tracker.record( + model=self._model, + input_tokens=result.input_tokens, + output_tokens=result.output_tokens, + operation="doc_generation", + file_path=None, + ) + ) + except RuntimeError: + pass # No running event loop — skip async record + return result # --- ChatProvider protocol implementation --- diff --git a/packages/core/src/repowise/core/providers/llm/litellm.py b/packages/core/src/repowise/core/providers/llm/litellm.py index 0692cd9..1273e22 100644 --- a/packages/core/src/repowise/core/providers/llm/litellm.py +++ b/packages/core/src/repowise/core/providers/llm/litellm.py @@ -37,9 +37,12 @@ RateLimitError, ) -from typing import Any, AsyncIterator +from typing import TYPE_CHECKING, Any, AsyncIterator from repowise.core.rate_limiter import RateLimiter +if TYPE_CHECKING: + from repowise.core.generation.cost_tracker import CostTracker + log = structlog.get_logger(__name__) _MAX_RETRIES = 3 @@ -64,11 +67,13 @@ def __init__( api_key: str | None = None, api_base: str | None = None, rate_limiter: RateLimiter | None = None, + cost_tracker: "CostTracker | None" = None, ) -> None: self._model = model self._api_key = api_key self._api_base = api_base self._rate_limiter = rate_limiter + self._cost_tracker = cost_tracker @property def provider_name(self) -> str: @@ -162,6 +167,23 @@ async def _generate_with_retry( output_tokens=result.output_tokens, request_id=request_id, ) + + if self._cost_tracker is not None: + import asyncio + + try: + asyncio.get_event_loop().create_task( + self._cost_tracker.record( + model=self._model, + input_tokens=result.input_tokens, + output_tokens=result.output_tokens, + operation="doc_generation", + file_path=None, + ) + ) + except RuntimeError: + pass # No running event loop — skip async record + return result # --- ChatProvider protocol implementation --- diff --git a/packages/core/src/repowise/core/providers/llm/openai.py b/packages/core/src/repowise/core/providers/llm/openai.py index b13248b..1cc3e7b 100644 --- a/packages/core/src/repowise/core/providers/llm/openai.py +++ b/packages/core/src/repowise/core/providers/llm/openai.py @@ -35,9 +35,12 @@ RateLimitError, ) -from typing import Any, AsyncIterator +from typing import TYPE_CHECKING, Any, AsyncIterator from repowise.core.rate_limiter import RateLimiter +if TYPE_CHECKING: + from repowise.core.generation.cost_tracker import CostTracker + log = structlog.get_logger(__name__) _MAX_RETRIES = 3 @@ -61,6 +64,7 @@ def __init__( model: str = "gpt-5.4-nano", base_url: str | None = None, rate_limiter: RateLimiter | None = None, + cost_tracker: "CostTracker | None" = None, ) -> None: resolved_key = api_key or os.environ.get("OPENAI_API_KEY") if not resolved_key: @@ -71,6 +75,7 @@ def __init__( self._client = AsyncOpenAI(api_key=resolved_key, base_url=base_url) self._model = model self._rate_limiter = rate_limiter + self._cost_tracker = cost_tracker @property def provider_name(self) -> str: @@ -161,6 +166,23 @@ async def _generate_with_retry( output_tokens=result.output_tokens, request_id=request_id, ) + + if self._cost_tracker is not None: + import asyncio + + try: + asyncio.get_event_loop().create_task( + self._cost_tracker.record( + model=self._model, + input_tokens=result.input_tokens, + output_tokens=result.output_tokens, + operation="doc_generation", + file_path=None, + ) + ) + except RuntimeError: + pass # No running event loop — skip async record + return result # --- ChatProvider protocol implementation --- diff --git a/packages/server/src/repowise/server/__init__.py b/packages/server/src/repowise/server/__init__.py index e384019..a97e29e 100644 --- a/packages/server/src/repowise/server/__init__.py +++ b/packages/server/src/repowise/server/__init__.py @@ -7,4 +7,4 @@ - Background job scheduler (APScheduler) """ -__version__ = "0.1.31" +__version__ = "0.2.0" diff --git a/packages/server/src/repowise/server/app.py b/packages/server/src/repowise/server/app.py index ddefcda..14ef79f 100644 --- a/packages/server/src/repowise/server/app.py +++ b/packages/server/src/repowise/server/app.py @@ -28,18 +28,22 @@ from repowise.core.providers.embedding.base import MockEmbedder from repowise.server import __version__ from repowise.server.routers import ( + blast_radius, chat, claude_md, + costs, dead_code, decisions, git, graph, health, jobs, + knowledge_map, pages, providers, repos, search, + security, symbols, webhooks, ) @@ -187,5 +191,9 @@ async def bad_request_handler(request: Request, exc: ValueError) -> JSONResponse app.include_router(decisions.router) app.include_router(chat.router) app.include_router(providers.router) + app.include_router(costs.router) + app.include_router(security.router) + app.include_router(blast_radius.router) + app.include_router(knowledge_map.router) return app diff --git a/packages/server/src/repowise/server/mcp_server/tool_dead_code.py b/packages/server/src/repowise/server/mcp_server/tool_dead_code.py index 8596233..364b104 100644 --- a/packages/server/src/repowise/server/mcp_server/tool_dead_code.py +++ b/packages/server/src/repowise/server/mcp_server/tool_dead_code.py @@ -28,6 +28,10 @@ async def get_dead_code( directory: str | None = None, owner: str | None = None, group_by: str | None = None, + include_internals: bool = False, + include_zombie_packages: bool = True, + no_unreachable: bool = False, + no_unused_exports: bool = False, ) -> dict: """Get a tiered refactor plan for dead and unused code. @@ -39,16 +43,39 @@ async def get_dead_code( group_by="owner" to see who owns the most dead code. Use tier to focus on a single confidence band. + Scan scope flags (mirror the DeadCodeAnalyzer.analyze config): + - Use ``min_confidence=0.7`` for high-confidence cleanups — filters out + speculative findings and surfaces only code with zero references that + hasn't been touched in months. Ideal before a release or refactor sprint. + - Use ``include_internals=True`` for aggressive scans of private symbols + (functions/variables prefixed with _ or declared without exports). This + has a higher false-positive rate and is off by default; enable it when + doing a thorough dead-code purge of a stable, well-tested module. + - Use ``no_unreachable=True`` to skip file-level reachability checks and + focus only on symbol-level findings (unused exports/internals). + - Use ``no_unused_exports=True`` to skip public-export checks, e.g. when + you know the repo exposes a public API consumed externally. + - Use ``include_zombie_packages=False`` to suppress monorepo package + findings, useful in repos where cross-package imports are intentionally + absent during development. + Args: repo: Repository path, name, or ID. kind: Filter by kind (unreachable_file, unused_export, unused_internal, zombie_package). - min_confidence: Minimum confidence threshold (default 0.5). + min_confidence: Minimum confidence threshold (default 0.5). Use 0.7 for high-confidence + cleanups only. safe_only: Only return findings marked safe_to_delete (default false). limit: Maximum findings per tier (default 20). tier: Focus on a single tier: "high" (>=0.8), "medium" (0.5-0.8), or "low" (<0.5). directory: Filter findings to a specific directory prefix. owner: Filter findings by primary owner name. group_by: "directory" for per-directory rollup, "owner" for ownership hotspots. + include_internals: Include unused private/internal symbol findings (default false). + Enable for aggressive scans of private symbols. + include_zombie_packages: Include zombie-package findings for monorepo packages with + no external importers (default true). + no_unreachable: Suppress unreachable-file findings (default false). + no_unused_exports: Suppress unused-export findings (default false). """ async with get_session(_state._session_factory) as session: repository = await _get_repo(session, repo) @@ -73,10 +100,23 @@ async def get_dead_code( ) git_meta_map = {g.file_path: g for g in git_res.scalars().all()} + # --- Build excluded kinds from scope flags --- + _excluded_kinds: set[str] = set() + if no_unreachable: + _excluded_kinds.add("unreachable_file") + if no_unused_exports: + _excluded_kinds.add("unused_export") + if not include_internals: + _excluded_kinds.add("unused_internal") + if not include_zombie_packages: + _excluded_kinds.add("zombie_package") + # --- Apply filters --- filtered = all_findings if kind: filtered = [f for f in filtered if f.kind == kind] + elif _excluded_kinds: + filtered = [f for f in filtered if f.kind not in _excluded_kinds] if safe_only: filtered = [f for f in filtered if f.safe_to_delete] if min_confidence > 0: diff --git a/packages/server/src/repowise/server/mcp_server/tool_overview.py b/packages/server/src/repowise/server/mcp_server/tool_overview.py index 75ad0d1..56198d9 100644 --- a/packages/server/src/repowise/server/mcp_server/tool_overview.py +++ b/packages/server/src/repowise/server/mcp_server/tool_overview.py @@ -16,6 +16,7 @@ from repowise.server.mcp_server import _state from repowise.server.mcp_server._helpers import _get_repo from repowise.server.mcp_server._server import mcp +from repowise.server.services.knowledge_map import compute_knowledge_map @mcp.tool() @@ -117,6 +118,18 @@ async def get_overview(repo: str | None = None) -> dict: "top_churn_modules": top_modules, } + # B. Knowledge map ------------------------------------------------------- + knowledge_map = await compute_knowledge_map(session, repository.id) + # Flatten onboarding_targets to a list of paths (MCP tool backward compat) + if knowledge_map and "onboarding_targets" in knowledge_map: + knowledge_map = dict(knowledge_map) + knowledge_map["onboarding_targets"] = [ + t["path"] for t in knowledge_map["onboarding_targets"] + ] + knowledge_map["knowledge_silos"] = [ + s["file_path"] for s in knowledge_map["knowledge_silos"] + ] + return { "title": overview_page.title if overview_page else repository.name, "content_md": overview_page.content if overview_page else "No overview generated yet.", @@ -135,4 +148,5 @@ async def get_overview(repo: str | None = None) -> dict: ], "entry_points": [n.node_id for n in entry_nodes], "git_health": git_health, + "knowledge_map": knowledge_map, } diff --git a/packages/server/src/repowise/server/mcp_server/tool_risk.py b/packages/server/src/repowise/server/mcp_server/tool_risk.py index f8cae89..7cad1a7 100644 --- a/packages/server/src/repowise/server/mcp_server/tool_risk.py +++ b/packages/server/src/repowise/server/mcp_server/tool_risk.py @@ -11,6 +11,8 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import text + from repowise.core.persistence.database import get_session from repowise.core.persistence.models import ( GitMetadata, @@ -127,6 +129,48 @@ def _compute_impact_surface( return ranked[:3] +async def _check_test_gap(session: AsyncSession, repo_id: str, target: str) -> bool: + """Return True if no test file corresponding to *target* exists in graph_nodes.""" + import os + + base = os.path.splitext(os.path.basename(target))[0] + ext = os.path.splitext(target)[1].lstrip(".") + # Build a LIKE pattern broad enough to catch test_, _test, .spec.* + patterns = [f"%test_{base}%", f"%{base}_test%", f"%{base}.spec.{ext}%"] + for pat in patterns: + row = await session.execute( + select(GraphNode).where( + GraphNode.repository_id == repo_id, + GraphNode.is_test == True, # noqa: E712 + GraphNode.node_id.like(pat), + ).limit(1) + ) + if row.scalar_one_or_none() is not None: + return False + return True + + +async def _get_security_signals( + session: AsyncSession, repo_id: str, target: str +) -> list[dict]: + """Fetch stored security findings for *target* from security_findings table.""" + try: + rows = await session.execute( + text( + "SELECT kind, severity, snippet FROM security_findings " + "WHERE repository_id = :repo_id AND file_path = :fp " + "ORDER BY severity DESC, kind" + ), + {"repo_id": repo_id, "fp": target}, + ) + return [ + {"kind": r[0], "severity": r[1], "snippet": r[2]} + for r in rows.all() + ] + except Exception: # noqa: BLE001 — table may not exist pre-migration + return [] + + async def _assess_one_target( session: AsyncSession, repository: Repository, @@ -136,7 +180,12 @@ async def _assess_one_target( reverse_deps: dict[str, set[str]], node_meta: dict[str, Any], ) -> dict: - """Assess risk for a single target file.""" + """Assess risk for a single target file. + + Enriches each result with: + - test_gap: bool — True when no test file matching this file's basename exists. + - security_signals: list of {kind, severity, snippet} from security_findings. + """ repo_id = repository.id result_data: dict[str, Any] = {"target": target} @@ -164,6 +213,8 @@ async def _assess_one_target( reverse_deps, node_meta, ) + result_data["test_gap"] = await _check_test_gap(session, repo_id, target) + result_data["security_signals"] = await _get_security_signals(session, repo_id, target) result_data["risk_summary"] = f"{target} — no git metadata available" return result_data @@ -240,6 +291,10 @@ async def _assess_one_target( if merge_commit_count > 0: result_data["merge_commit_count_90d"] = merge_commit_count + # C. Test gaps + security signals + result_data["test_gap"] = await _check_test_gap(session, repo_id, target) + result_data["security_signals"] = await _get_security_signals(session, repo_id, target) + capped = getattr(meta, "commit_count_capped", False) capped_note = " (history truncated — actual count may be higher)" if capped else "" result_data["commit_count_capped"] = capped @@ -262,6 +317,7 @@ async def _assess_one_target( async def get_risk( targets: list[str], repo: str | None = None, + changed_files: list[str] | None = None, ) -> dict: """Assess modification risk for one or more files before making changes. @@ -270,14 +326,26 @@ async def get_risk( - risk_type ("churn-heavy"/"bug-prone"/"high-coupling"/"stable") - impact_surface: top 3 critical modules that would break - dependents, co-change partners, ownership + - test_gap: bool — True if no test file exists for this file + - security_signals: list of {kind, severity, snippet} from static analysis Plus the top 5 global hotspots for ambient awareness. - Example: get_risk(["src/auth/service.py", "src/auth/middleware.py"]) + Pass ``changed_files`` for PR review / blast radius analysis. When provided, + the response includes an additional ``pr_blast_radius`` key containing: + - direct_risks: per-file risk score (centrality × temporal hotspot) + - transitive_affected: files that import any changed file (up to depth 3) + - cochange_warnings: historical co-change partners missing from the PR + - recommended_reviewers: top 5 owners of affected files + - test_gaps: changed/affected files lacking a corresponding test + - overall_risk_score: 0-10 composite score + + Example: get_risk(["src/auth/service.py"], changed_files=["src/auth/service.py"]) Args: - targets: List of file paths to assess. + targets: List of file paths to assess (standard per-file risk). repo: Repository path, name, or ID. + changed_files: Optional list of files changed in a PR for blast-radius analysis. """ async with get_session(_state._session_factory) as session: repository = await _get_repo(session, repo) @@ -343,7 +411,18 @@ async def get_risk( if h.file_path not in target_set ][:5] - return { + # A. PR blast radius (only when caller passes changed_files) + pr_blast_radius: dict | None = None + if changed_files: + from repowise.core.analysis.pr_blast import PRBlastRadiusAnalyzer + + analyzer = PRBlastRadiusAnalyzer(session, repo_id) + pr_blast_radius = await analyzer.analyze_files(changed_files) + + response: dict = { "targets": {r["target"]: r for r in results}, "global_hotspots": global_hotspots, } + if pr_blast_radius is not None: + response["pr_blast_radius"] = pr_blast_radius + return response diff --git a/packages/server/src/repowise/server/routers/blast_radius.py b/packages/server/src/repowise/server/routers/blast_radius.py new file mode 100644 index 0000000..f9066a2 --- /dev/null +++ b/packages/server/src/repowise/server/routers/blast_radius.py @@ -0,0 +1,36 @@ +"""/api/repos/{repo_id}/blast-radius — PR blast radius analysis endpoint.""" + +from __future__ import annotations + +from sqlalchemy.ext.asyncio import AsyncSession + +from fastapi import APIRouter, Depends +from repowise.core.analysis.pr_blast import PRBlastRadiusAnalyzer +from repowise.server.deps import get_db_session, verify_api_key +from repowise.server.schemas import BlastRadiusRequest, BlastRadiusResponse + +router = APIRouter( + prefix="/api/repos", + tags=["blast-radius"], + dependencies=[Depends(verify_api_key)], +) + + +@router.post("/{repo_id}/blast-radius", response_model=BlastRadiusResponse) +async def analyze_blast_radius( + repo_id: str, + body: BlastRadiusRequest, + session: AsyncSession = Depends(get_db_session), # noqa: B008 +) -> BlastRadiusResponse: + """Compute blast radius for a proposed PR given its changed files. + + Returns direct risk scores per file, transitively affected files (BFS up to + max_depth), historical co-change warnings, recommended reviewers, test gaps, + and an overall risk score (0–10). + """ + analyzer = PRBlastRadiusAnalyzer(session=session, repo_id=repo_id) + result = await analyzer.analyze_files( + changed_files=body.changed_files, + max_depth=body.max_depth, + ) + return BlastRadiusResponse(**result) diff --git a/packages/server/src/repowise/server/routers/costs.py b/packages/server/src/repowise/server/routers/costs.py new file mode 100644 index 0000000..92f8ca1 --- /dev/null +++ b/packages/server/src/repowise/server/routers/costs.py @@ -0,0 +1,111 @@ +"""/api/repos/{repo_id}/costs — LLM cost tracking endpoints.""" + +from __future__ import annotations + +from datetime import datetime, date + +import sqlalchemy as sa +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from fastapi import APIRouter, Depends, Query +from repowise.core.persistence.models import LlmCost +from repowise.server.deps import get_db_session, verify_api_key +from repowise.server.schemas import CostGroupResponse, CostSummaryResponse + +router = APIRouter( + prefix="/api/repos", + tags=["costs"], + dependencies=[Depends(verify_api_key)], +) + + +def _parse_since(since: str | None) -> datetime | None: + """Parse an ISO date string (YYYY-MM-DD) into a datetime, or return None.""" + if since is None: + return None + try: + return datetime.fromisoformat(since) + except ValueError: + # Try date-only format + return datetime.combine(date.fromisoformat(since), datetime.min.time()) + + +@router.get("/{repo_id}/costs/summary", response_model=CostSummaryResponse) +async def get_cost_summary( + repo_id: str, + since: str | None = Query(None, description="ISO date filter, e.g. 2025-01-01"), + session: AsyncSession = Depends(get_db_session), # noqa: B008 +) -> CostSummaryResponse: + """Return aggregate cost totals for a repository.""" + since_dt = _parse_since(since) + + stmt = sa.select( + sa.func.count().label("calls"), + sa.func.sum(LlmCost.input_tokens).label("input_tokens"), + sa.func.sum(LlmCost.output_tokens).label("output_tokens"), + sa.func.sum(LlmCost.cost_usd).label("cost_usd"), + ).where(LlmCost.repository_id == repo_id) + + if since_dt is not None: + stmt = stmt.where(LlmCost.ts >= since_dt) + + result = await session.execute(stmt) + row = result.one() + + return CostSummaryResponse( + total_cost_usd=row.cost_usd or 0.0, + total_calls=row.calls or 0, + total_input_tokens=row.input_tokens or 0, + total_output_tokens=row.output_tokens or 0, + since=since, + ) + + +@router.get("/{repo_id}/costs", response_model=list[CostGroupResponse]) +async def list_costs( + repo_id: str, + since: str | None = Query(None, description="ISO date filter, e.g. 2025-01-01"), + by: str = Query("day", description="Grouping dimension: operation | model | day"), + session: AsyncSession = Depends(get_db_session), # noqa: B008 +) -> list[CostGroupResponse]: + """Return grouped cost totals for a repository.""" + since_dt = _parse_since(since) + + if by == "model": + group_col = LlmCost.model + elif by == "day": + group_col = sa.func.strftime("%Y-%m-%d", LlmCost.ts) + else: + # Default: operation + group_col = LlmCost.operation + + stmt = ( + sa.select( + group_col.label("group"), + sa.func.count().label("calls"), + sa.func.sum(LlmCost.input_tokens).label("input_tokens"), + sa.func.sum(LlmCost.output_tokens).label("output_tokens"), + sa.func.sum(LlmCost.cost_usd).label("cost_usd"), + ) + .where(LlmCost.repository_id == repo_id) + .group_by(group_col) + .order_by(sa.func.sum(LlmCost.cost_usd).desc()) + ) + + if since_dt is not None: + stmt = stmt.where(LlmCost.ts >= since_dt) + + result = await session.execute(stmt) + rows = result.fetchall() + + return [ + CostGroupResponse( + group=row.group or "(unknown)", + calls=row.calls or 0, + input_tokens=row.input_tokens or 0, + output_tokens=row.output_tokens or 0, + cost_usd=row.cost_usd or 0.0, + ) + for row in rows + ] diff --git a/packages/server/src/repowise/server/routers/git.py b/packages/server/src/repowise/server/routers/git.py index 57e80fe..1d3cdd4 100644 --- a/packages/server/src/repowise/server/routers/git.py +++ b/packages/server/src/repowise/server/routers/git.py @@ -11,6 +11,7 @@ from repowise.core.persistence import crud from repowise.core.persistence.models import GitMetadata from repowise.server.deps import get_db_session, verify_api_key +from repowise.server.mcp_server.tool_risk import _check_test_gap from repowise.server.schemas import ( GitMetadataResponse, GitSummaryResponse, @@ -35,7 +36,9 @@ async def get_git_metadata( meta = await crud.get_git_metadata(session, repo_id, file_path) if meta is None: raise HTTPException(status_code=404, detail="Git metadata not found") - return GitMetadataResponse.from_orm(meta) + response = GitMetadataResponse.from_orm(meta) + response.test_gap = await _check_test_gap(session, repo_id, file_path) + return response @router.get("/{repo_id}/hotspots", response_model=list[HotspotResponse]) @@ -51,7 +54,10 @@ async def get_hotspots( GitMetadata.repository_id == repo_id, GitMetadata.is_hotspot.is_(True), ) - .order_by(GitMetadata.churn_percentile.desc()) + .order_by( + GitMetadata.temporal_hotspot_score.desc().nulls_last(), + GitMetadata.churn_percentile.desc(), + ) .limit(limit) ) rows = result.scalars().all() @@ -61,6 +67,7 @@ async def get_hotspots( commit_count_90d=r.commit_count_90d, commit_count_30d=r.commit_count_30d, churn_percentile=r.churn_percentile, + temporal_hotspot_score=r.temporal_hotspot_score, primary_owner=r.primary_owner_name, is_hotspot=r.is_hotspot, is_stable=r.is_stable, diff --git a/packages/server/src/repowise/server/routers/health.py b/packages/server/src/repowise/server/routers/health.py index 84afa33..caca4a1 100644 --- a/packages/server/src/repowise/server/routers/health.py +++ b/packages/server/src/repowise/server/routers/health.py @@ -6,13 +6,16 @@ from __future__ import annotations from sqlalchemy import func, select +from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.sql import text -from fastapi import APIRouter, Request +from fastapi import APIRouter, Depends, Request +from repowise.core.persistence.coordinator import AtomicStorageCoordinator from repowise.core.persistence.database import get_session from repowise.core.persistence.models import GenerationJob, Page from repowise.server import __version__ -from repowise.server.schemas import HealthResponse +from repowise.server.deps import get_db_session, get_vector_store +from repowise.server.schemas import CoordinatorHealthResponse, HealthResponse router = APIRouter(tags=["health"]) @@ -74,3 +77,50 @@ async def metrics(request: Request) -> str: from starlette.responses import Response return Response(content="\n".join(lines) + "\n", media_type="text/plain") + + +_repo_health_router = APIRouter(prefix="/api/repos", tags=["health"]) + + +@_repo_health_router.get("/{repo_id}/health/coordinator", response_model=CoordinatorHealthResponse) +async def coordinator_health( + repo_id: str, + session: AsyncSession = Depends(get_db_session), # noqa: B008 + vector_store=Depends(get_vector_store), # noqa: B008 +) -> CoordinatorHealthResponse: + """Return coordinator drift health for a repository.""" + coord = AtomicStorageCoordinator(session, graph_builder=None, vector_store=vector_store) + result = await coord.health_check() + + sql_pages: int | None = result.get("sql_pages") + vector_count: int | None = result.get("vector_count") + graph_nodes: int | None = result.get("graph_nodes") + drift: float | None = result.get("drift") + + # Normalise vector_count: -1 means unsupported (return None) + if vector_count == -1: + vector_count = None + + # Drift percentage (0–100) + drift_pct: float | None = round(drift * 100, 2) if drift is not None else None + + if drift_pct is None: + status = "ok" + elif drift_pct <= 1.0: + status = "ok" + elif drift_pct <= 5.0: + status = "warning" + else: + status = "critical" + + return CoordinatorHealthResponse( + sql_pages=sql_pages, + vector_count=vector_count, + graph_nodes=graph_nodes, + drift_pct=drift_pct, + status=status, + ) + + +# Merge repo-scoped routes into the main router so they are registered together. +router.include_router(_repo_health_router) diff --git a/packages/server/src/repowise/server/routers/jobs.py b/packages/server/src/repowise/server/routers/jobs.py index b8edfbf..cf57763 100644 --- a/packages/server/src/repowise/server/routers/jobs.py +++ b/packages/server/src/repowise/server/routers/jobs.py @@ -5,14 +5,14 @@ import asyncio import json -from sqlalchemy import select +from sqlalchemy import func, select from sqlalchemy.ext.asyncio import AsyncSession from starlette.responses import StreamingResponse from fastapi import APIRouter, Depends, HTTPException, Query, Request from repowise.core.persistence import crud from repowise.core.persistence.database import get_session -from repowise.core.persistence.models import GenerationJob +from repowise.core.persistence.models import GenerationJob, LlmCost from repowise.server.deps import get_db_session, verify_api_key from repowise.server.schemas import JobResponse @@ -79,6 +79,16 @@ async def event_generator(): yield f"event: error\ndata: {data}\n\n" return + # Sum LLM costs recorded since the job started + actual_cost_usd: float | None = None + if job.started_at is not None: + cost_q = select(func.sum(LlmCost.cost_usd)).where( + LlmCost.repository_id == job.repository_id, + LlmCost.ts >= job.started_at, + ) + async with get_session(factory) as cost_session: + actual_cost_usd = await cost_session.scalar(cost_q) + progress = { "job_id": job.id, "status": job.status, @@ -86,6 +96,7 @@ async def event_generator(): "total_pages": job.total_pages, "failed_pages": job.failed_pages, "current_level": job.current_level, + "actual_cost_usd": actual_cost_usd, } data = json.dumps(progress) yield f"event: progress\ndata: {data}\n\n" diff --git a/packages/server/src/repowise/server/routers/knowledge_map.py b/packages/server/src/repowise/server/routers/knowledge_map.py new file mode 100644 index 0000000..6931f63 --- /dev/null +++ b/packages/server/src/repowise/server/routers/knowledge_map.py @@ -0,0 +1,46 @@ +"""/api/repos/{repo_id}/knowledge-map — Knowledge map endpoint.""" + +from __future__ import annotations + +from sqlalchemy.ext.asyncio import AsyncSession + +from fastapi import APIRouter, Depends, HTTPException +from repowise.server.deps import get_db_session, verify_api_key +from repowise.server.schemas import ( + KnowledgeMapOwner, + KnowledgeMapResponse, + KnowledgeMapSilo, + KnowledgeMapTarget, +) +from repowise.server.services.knowledge_map import compute_knowledge_map + +router = APIRouter( + prefix="/api/repos", + tags=["knowledge-map"], + dependencies=[Depends(verify_api_key)], +) + + +@router.get("/{repo_id}/knowledge-map", response_model=KnowledgeMapResponse) +async def get_knowledge_map( + repo_id: str, + session: AsyncSession = Depends(get_db_session), # noqa: B008 +) -> KnowledgeMapResponse: + """Return knowledge-map data for a repository. + + Includes top owners by file count, knowledge silos (files with >80 % + single-owner concentration), and onboarding targets (high-centrality + files with least documentation). + """ + data = await compute_knowledge_map(session, repo_id) + if not data: + raise HTTPException( + status_code=404, + detail="No git metadata found for this repository. Run an index first.", + ) + + return KnowledgeMapResponse( + top_owners=[KnowledgeMapOwner(**o) for o in data["top_owners"]], + knowledge_silos=[KnowledgeMapSilo(**s) for s in data["knowledge_silos"]], + onboarding_targets=[KnowledgeMapTarget(**t) for t in data["onboarding_targets"]], + ) diff --git a/packages/server/src/repowise/server/routers/security.py b/packages/server/src/repowise/server/routers/security.py new file mode 100644 index 0000000..cdd76a9 --- /dev/null +++ b/packages/server/src/repowise/server/routers/security.py @@ -0,0 +1,52 @@ +"""/api/repos/{repo_id}/security — Security findings endpoints.""" + +from __future__ import annotations + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from fastapi import APIRouter, Depends, Query +from repowise.core.persistence.models import SecurityFinding +from repowise.server.deps import get_db_session, verify_api_key +from repowise.server.schemas import SecurityFindingResponse + +router = APIRouter( + prefix="/api/repos", + tags=["security"], + dependencies=[Depends(verify_api_key)], +) + + +@router.get("/{repo_id}/security", response_model=list[SecurityFindingResponse]) +async def list_security_findings( + repo_id: str, + file_path: str | None = Query(None, description="Filter by relative file path"), + severity: str | None = Query(None, description="Filter by severity: high, med, or low"), + limit: int = Query(100, ge=1, le=500), + session: AsyncSession = Depends(get_db_session), # noqa: B008 +) -> list[SecurityFindingResponse]: + """List security findings for a repository, with optional filters.""" + stmt = select(SecurityFinding).where(SecurityFinding.repository_id == repo_id) + + if file_path is not None: + stmt = stmt.where(SecurityFinding.file_path == file_path) + + if severity is not None: + stmt = stmt.where(SecurityFinding.severity == severity) + + stmt = stmt.order_by(SecurityFinding.detected_at.desc()).limit(limit) + + result = await session.execute(stmt) + rows = result.scalars().all() + + return [ + SecurityFindingResponse( + id=row.id, + file_path=row.file_path, + kind=row.kind, + severity=row.severity, + snippet=row.snippet, + detected_at=row.detected_at, + ) + for row in rows + ] diff --git a/packages/server/src/repowise/server/schemas.py b/packages/server/src/repowise/server/schemas.py index bac0bfb..26451af 100644 --- a/packages/server/src/repowise/server/schemas.py +++ b/packages/server/src/repowise/server/schemas.py @@ -317,6 +317,7 @@ class GitMetadataResponse(BaseModel): avg_commit_size: float commit_categories: dict merge_commit_count_90d: int + test_gap: bool | None = None @classmethod def from_orm(cls, obj: object) -> GitMetadataResponse: @@ -356,6 +357,7 @@ class HotspotResponse(BaseModel): commit_count_90d: int commit_count_30d: int churn_percentile: float + temporal_hotspot_score: float | None = None primary_owner: str | None is_hotspot: bool is_stable: bool @@ -433,6 +435,20 @@ class DeadCodeSummaryResponse(BaseModel): by_kind: dict +# --------------------------------------------------------------------------- +# Security +# --------------------------------------------------------------------------- + + +class SecurityFindingResponse(BaseModel): + id: int + file_path: str + kind: str + severity: str + snippet: str | None + detected_at: datetime + + # --------------------------------------------------------------------------- # Repo Stats # --------------------------------------------------------------------------- @@ -539,6 +555,49 @@ class HotFilesGraphResponse(BaseModel): links: list[GraphEdgeResponse] +# --------------------------------------------------------------------------- +# Blast Radius +# --------------------------------------------------------------------------- + + +class BlastRadiusRequest(BaseModel): + changed_files: list[str] + max_depth: int = Field(default=3, ge=1, le=10) + + +class DirectRiskEntry(BaseModel): + path: str + risk_score: float + temporal_hotspot: float + centrality: float + + +class TransitiveEntry(BaseModel): + path: str + depth: int + + +class CochangeWarning(BaseModel): + changed: str + missing_partner: str + score: float + + +class ReviewerEntry(BaseModel): + email: str + files: int + ownership_pct: float + + +class BlastRadiusResponse(BaseModel): + direct_risks: list[DirectRiskEntry] + transitive_affected: list[TransitiveEntry] + cochange_warnings: list[CochangeWarning] + recommended_reviewers: list[ReviewerEntry] + test_gaps: list[str] + overall_risk_score: float + + # --------------------------------------------------------------------------- # Health # --------------------------------------------------------------------------- @@ -550,6 +609,44 @@ class HealthResponse(BaseModel): version: str +class CoordinatorHealthResponse(BaseModel): + sql_pages: int | None + vector_count: int | None + graph_nodes: int | None + drift_pct: float | None + status: str # "ok" | "warning" | "critical" + + +# --------------------------------------------------------------------------- +# Knowledge Map +# --------------------------------------------------------------------------- + + +class KnowledgeMapOwner(BaseModel): + email: str + name: str + files_owned: int + percentage: float + + +class KnowledgeMapSilo(BaseModel): + file_path: str + owner_email: str + owner_pct: float + + +class KnowledgeMapTarget(BaseModel): + path: str + pagerank: float + doc_words: int + + +class KnowledgeMapResponse(BaseModel): + top_owners: list[KnowledgeMapOwner] + knowledge_silos: list[KnowledgeMapSilo] + onboarding_targets: list[KnowledgeMapTarget] + + # --------------------------------------------------------------------------- # Decisions # --------------------------------------------------------------------------- @@ -691,3 +788,24 @@ class SetActiveProviderRequest(BaseModel): class SetApiKeyRequest(BaseModel): api_key: str + + +# --------------------------------------------------------------------------- +# Cost Tracking +# --------------------------------------------------------------------------- + + +class CostGroupResponse(BaseModel): + group: str + calls: int + input_tokens: int + output_tokens: int + cost_usd: float + + +class CostSummaryResponse(BaseModel): + total_cost_usd: float + total_calls: int + total_input_tokens: int + total_output_tokens: int + since: str | None diff --git a/packages/server/src/repowise/server/services/__init__.py b/packages/server/src/repowise/server/services/__init__.py new file mode 100644 index 0000000..d99cd3e --- /dev/null +++ b/packages/server/src/repowise/server/services/__init__.py @@ -0,0 +1 @@ +"""repowise server services.""" diff --git a/packages/server/src/repowise/server/services/knowledge_map.py b/packages/server/src/repowise/server/services/knowledge_map.py new file mode 100644 index 0000000..6bb8c83 --- /dev/null +++ b/packages/server/src/repowise/server/services/knowledge_map.py @@ -0,0 +1,102 @@ +"""Shared logic for computing the knowledge map for a repository.""" + +from __future__ import annotations + +from collections import defaultdict +from typing import Any + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from repowise.core.persistence.models import GitMetadata, GraphNode, Page + + +async def compute_knowledge_map(session: AsyncSession, repo_id: str) -> dict[str, Any]: + """Return knowledge-map data for *repo_id*. + + Returns a dict with keys: + top_owners — list of {email, name, files_owned, percentage} + knowledge_silos — list of {file_path, owner_email, owner_pct} + onboarding_targets — list of {path, pagerank, doc_words} + + Returns an empty dict when no git metadata is available. + """ + git_res = await session.execute( + select(GitMetadata).where(GitMetadata.repository_id == repo_id) + ) + all_git = git_res.scalars().all() + + if not all_git: + return {} + + # top_owners: aggregate primary_owner_email across all files + owner_file_count: dict[str, int] = defaultdict(int) + owner_name_map: dict[str, str] = {} + for g in all_git: + email = g.primary_owner_email or "" + if email: + owner_file_count[email] += 1 + if g.primary_owner_name: + owner_name_map[email] = g.primary_owner_name + + total_files = len(all_git) or 1 + top_owners = sorted( + [ + { + "email": email, + "name": owner_name_map.get(email, ""), + "files_owned": count, + "percentage": round(count / total_files * 100.0, 1), + } + for email, count in owner_file_count.items() + ], + key=lambda x: -x["files_owned"], + )[:10] + + # knowledge_silos: files where primary owner has > 80% ownership + knowledge_silos = [ + { + "file_path": g.file_path, + "owner_email": g.primary_owner_email or "", + "owner_pct": round(float(g.primary_owner_commit_pct or 0.0), 3), + } + for g in all_git + if (g.primary_owner_commit_pct or 0.0) > 0.8 + ] + + # onboarding_targets: high-centrality files with fewest docs + node_result = await session.execute( + select(GraphNode).where( + GraphNode.repository_id == repo_id, + GraphNode.is_test == False, # noqa: E712 + ) + ) + all_nodes = node_result.scalars().all() + + page_result = await session.execute( + select(Page).where( + Page.repository_id == repo_id, + Page.page_type == "file_page", + ) + ) + doc_words: dict[str, int] = { + p.target_path: len(p.content.split()) for p in page_result.scalars().all() + } + + candidates = [ + { + "path": n.node_id, + "pagerank": n.pagerank, + "doc_words": doc_words.get(n.node_id, 0), + } + for n in all_nodes + if n.pagerank > 0.0 + ] + candidates.sort(key=lambda x: (x["doc_words"], -x["pagerank"])) + onboarding_targets = candidates[:10] + + return { + "top_owners": top_owners, + "knowledge_silos": knowledge_silos, + "onboarding_targets": onboarding_targets, + } diff --git a/packages/web/src/app/repos/[id]/blast-radius/page.tsx b/packages/web/src/app/repos/[id]/blast-radius/page.tsx new file mode 100644 index 0000000..77c3a7d --- /dev/null +++ b/packages/web/src/app/repos/[id]/blast-radius/page.tsx @@ -0,0 +1,429 @@ +"use client"; + +import { useState } from "react"; +import useSWR from "swr"; +import { useParams } from "next/navigation"; +import { Radar, Plus, Flame } from "lucide-react"; +import { Button } from "@/components/ui/button"; +import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; +import { cn } from "@/lib/utils/cn"; +import { + analyzeBlastRadius, + type BlastRadiusResponse, + type DirectRiskEntry, + type TransitiveEntry, + type CochangeWarning, + type ReviewerEntry, +} from "@/lib/api/blast-radius"; +import { getHotspots } from "@/lib/api/git"; + +// --------------------------------------------------------------------------- +// Risk score gauge card +// --------------------------------------------------------------------------- + +function RiskScoreCard({ score }: { score: number }) { + const color = + score >= 7 + ? "text-red-500 border-red-500/30 bg-red-500/5" + : score >= 4 + ? "text-amber-500 border-amber-500/30 bg-amber-500/5" + : "text-emerald-500 border-emerald-500/30 bg-emerald-500/5"; + const label = score >= 7 ? "High Risk" : score >= 4 ? "Medium Risk" : "Low Risk"; + + return ( + + + + {score.toFixed(1)} + + {label} + Overall Risk Score (0–10) + + + ); +} + +// --------------------------------------------------------------------------- +// Simple table helpers +// --------------------------------------------------------------------------- + +function TableSection({ + title, + children, + empty, +}: { + title: string; + children: React.ReactNode; + empty: boolean; +}) { + return ( + + + {title} + + + {empty ? ( +

None

+ ) : ( + children + )} +
+
+ ); +} + +function Th({ children }: { children: React.ReactNode }) { + return ( + + {children} + + ); +} + +function Td({ children, className }: { children: React.ReactNode; className?: string }) { + return ( + + {children} + + ); +} + +// --------------------------------------------------------------------------- +// Result tables +// --------------------------------------------------------------------------- + +function DirectRisksTable({ rows }: { rows: DirectRiskEntry[] }) { + return ( +
+ + + + + + + + + + + {rows.map((r) => ( + + + + + + + ))} + +
FileRisk ScoreTemporal HotspotCentrality
+ {r.path} + {r.risk_score.toFixed(4)}{r.temporal_hotspot.toFixed(4)}{r.centrality.toFixed(6)}
+
+ ); +} + +function TransitiveTable({ rows }: { rows: TransitiveEntry[] }) { + return ( +
+ + + + + + + + + {rows.map((r) => ( + + + + + ))} + +
FileDepth
+ {r.path} + {r.depth}
+
+ ); +} + +function CochangeTable({ rows }: { rows: CochangeWarning[] }) { + return ( +
+ + + + + + + + + + {rows.map((r, i) => ( + + + + + + ))} + +
Changed FileMissing PartnerCo-change Count
+ {r.changed} + + {r.missing_partner} + {r.score}
+
+ ); +} + +function ReviewersTable({ rows }: { rows: ReviewerEntry[] }) { + return ( +
+ + + + + + + + + + {rows.map((r) => ( + + + + + + ))} + +
EmailFiles OwnedAvg Ownership %
{r.email}{r.files}{(r.ownership_pct * 100).toFixed(1)}%
+
+ ); +} + +function TestGapsList({ gaps }: { gaps: string[] }) { + return ( +
    + {gaps.map((g) => ( +
  • + {g} +
  • + ))} +
+ ); +} + +// --------------------------------------------------------------------------- +// Page +// --------------------------------------------------------------------------- + +export default function BlastRadiusPage() { + const params = useParams<{ id: string }>(); + const repoId = params.id; + + const [files, setFiles] = useState(""); + const [maxDepth, setMaxDepth] = useState(3); + const [loading, setLoading] = useState(false); + const [error, setError] = useState(null); + const [result, setResult] = useState(null); + + // Suggestions: top 8 hotspots so users can prefill with one click instead + // of remembering paths. Falls back gracefully if the call fails. + const { data: hotspotSuggestions } = useSWR( + repoId ? ["blast-radius-suggestions", repoId] : null, + () => getHotspots(repoId, 8), + ); + + const addSuggestion = (path: string) => { + setFiles((prev) => { + const lines = prev.split("\n").map((l) => l.trim()).filter(Boolean); + if (lines.includes(path)) return prev; + return [...lines, path].join("\n"); + }); + }; + + const useAllHotspots = () => { + if (!hotspotSuggestions) return; + setFiles(hotspotSuggestions.map((h) => h.file_path).join("\n")); + }; + + const clearFiles = () => setFiles(""); + + const handleAnalyze = async () => { + const changedFiles = files + .split("\n") + .map((l) => l.trim()) + .filter(Boolean); + + if (changedFiles.length === 0) { + setError("Enter at least one file path."); + return; + } + + setLoading(true); + setError(null); + setResult(null); + + try { + const data = await analyzeBlastRadius(repoId, { + changed_files: changedFiles, + max_depth: maxDepth, + }); + setResult(data); + } catch (err) { + setError(err instanceof Error ? err.message : "Analysis failed."); + } finally { + setLoading(false); + } + }; + + return ( +
+ {/* Header */} +
+

+ + Blast Radius +

+

+ Estimate the impact of a proposed PR — direct risks, transitive effects, reviewer + suggestions, and test gaps. +

+
+ + {/* Input form */} + + + Changed Files + + +

+ Paste a list of file paths (one per line) — typically the files in your PR diff. + Don't know what to try? Click a hotspot below to prefill, or use{" "} + + . +

+ + {hotspotSuggestions && hotspotSuggestions.length > 0 && ( +
+ {hotspotSuggestions.map((h) => ( + + ))} +
+ )} + +