diff --git a/.claude/CLAUDE.md b/.claude/CLAUDE.md index 2d2c852..f6d83b7 100644 --- a/.claude/CLAUDE.md +++ b/.claude/CLAUDE.md @@ -12,6 +12,9 @@ Read-only caching tier for org and engineering data. Materializes people, teams, ## Commands ```bash uv run scripts/build_db.py --force # rebuild database +uv run scripts/fetch_pricing.py # fetch cloud pricing → data/pricing.db +uv run scripts/fetch_github.py --org YOUR_ORG # fetch GitHub org data → data/github.db +uv run scripts/refresh_catalog.py --refresh # fetch fresh data from configured sources scripts/test.sh # run test suite uv run mcp_server.py # start MCP server (stdio) uv run mcp_server.py --http # start MCP server (HTTP :8000) diff --git a/.gitignore b/.gitignore index c2cb4f2..e69ec5f 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,9 @@ data/*.db-shm # Track example data !data/acme-* +# Local extensions (not tracked upstream) +extensions/ + # Credentials .env diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0d268ba..5f22aba 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -8,8 +8,12 @@ repos: - repo: local hooks: - id: no-secrets - name: No secrets in commits - entry: bash -c 'if grep -rn -E "(API_TOKEN|API_SECRET|PASSWORD|Bearer |client_secret)" "$@" 2>/dev/null; then echo "Potential secret found!"; exit 1; fi' - language: system + name: Check for secrets + entry: scripts/check-no-secrets.sh + language: script types: [text] - exclude: ^(\.env\.example|\.pre-commit-config\.yaml|\.gitignore) + - id: protected-files + name: Check protected files + entry: scripts/check-protected-files.sh + language: script + pass_filenames: false diff --git a/mcp_server.py b/mcp_server.py index 88a92d6..8923308 100644 --- a/mcp_server.py +++ b/mcp_server.py @@ -16,7 +16,9 @@ """ import argparse +import importlib import json +import logging import os import re import sqlite3 @@ -44,6 +46,9 @@ "component mappings, scrum team boards, and governance documents with sub-ms latency. " "Use list_scrum_team_boards for scrum team staffing data (FTE counts by role). " "Use list_documents/get_document/get_document_section for governance docs. " + "Use cloud_pricing_lookup for AWS/Claude pricing data. " + "Use get_direct_reports/get_org_tree/get_management_chain for org hierarchy queries. " + "Use github_* tools or query with github.* prefix for GitHub data. " "All data is read-only." ), ) @@ -77,6 +82,11 @@ def _get_conn() -> sqlite3.Connection: conn.execute("PRAGMA mmap_size = 268435456") conn.execute("PRAGMA cache_size = -64000") conn.execute("PRAGMA temp_store = MEMORY") + # Attach optional databases if they exist + for db_name in ("pricing", "github"): + db_path = DB_PATH.parent / f"{db_name}.db" + if db_path.exists(): + conn.execute(f"ATTACH DATABASE 'file:{db_path}?mode=ro' AS {db_name}") _conn = conn return conn @@ -85,6 +95,36 @@ def _rows_to_dicts(rows: list[sqlite3.Row]) -> list[dict]: return [dict(r) for r in rows] +def _db_attached(conn: sqlite3.Connection, name: str) -> bool: + """Check if a database is attached.""" + try: + conn.execute(f"SELECT 1 FROM {name}.sqlite_master LIMIT 1") + return True + except sqlite3.OperationalError: + return False + + +def _attached_db_schema(db_name: str, build_hint: str) -> str: + """Return schema DDL + row counts for an attached database.""" + conn = _get_conn() + if not _db_attached(conn, db_name): + return f"{db_name}.db is not attached. Run: {build_hint}" + lines = [f"-- {db_name.upper()} TABLES\n"] + tables = conn.execute( + f"SELECT name, sql FROM {db_name}.sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name" + ).fetchall() + for t in tables: + count = conn.execute(f"SELECT COUNT(*) FROM {db_name}.[{t['name']}]").fetchone()[0] + lines.append(f"-- {t['name']}: {count:,} rows") + lines.append(t["sql"] + ";\n") + views = conn.execute(f"SELECT name, sql FROM {db_name}.sqlite_master WHERE type='view' ORDER BY name").fetchall() + if views: + lines.append(f"\n-- {db_name.upper()} VIEWS\n") + for v in views: + lines.append(v["sql"] + ";\n") + return "\n".join(lines) + + def _normalize_release(release: str) -> tuple[str, str]: """Extract (product, major.minor) from release strings in various formats. @@ -194,6 +234,24 @@ def catalog_resource() -> str: return "DATA_CATALOG.yaml not found." +@mcp.resource( + "gps://pricing-schema", + name="Cloud Pricing Database Schema", + description="DDL and row counts for pricing.db (when available)", +) +def pricing_schema_resource() -> str: + return _attached_db_schema("pricing", "uv run scripts/fetch_pricing.py") + + +@mcp.resource( + "gps://github-schema", + name="GitHub Database Schema", + description="DDL and row counts for github.db (when available)", +) +def github_schema_resource() -> str: + return _attached_db_schema("github", "uv run scripts/fetch_github.py --org YOUR_ORG") + + # --------------------------------------------------------------------------- # Tools # --------------------------------------------------------------------------- @@ -348,6 +406,121 @@ def list_scrum_team_boards(organization: str | None = None) -> str: return json.dumps({"teams": teams, "count": len(teams)}, default=str) +# --------------------------------------------------------------------------- +# Org hierarchy tools +# --------------------------------------------------------------------------- + + +@mcp.tool( + annotations={"readOnlyHint": True, "openWorldHint": False}, +) +def get_direct_reports(uid: str) -> str: + """List all people who directly report to the given user ID. + + Returns each direct report with their full person detail (name, title, email, + location, org, teams). Use this to answer 'who reports to X' questions. + """ + conn = _get_conn() + sql = """ + SELECT + p.person_id, p.name, p.user_id, p.manager, p.manager_uid, + o.org_name, o.org_key, s.specialty_name AS specialty, + p.job_title, p.email, p.location, p.status, p.source, p.last_modified, + GROUP_CONCAT(DISTINCT jc.component_name) AS components, + GROUP_CONCAT(DISTINCT mt.miro_team_name) AS miro_teams, + GROUP_CONCAT(DISTINCT st.team_name) AS scrum_teams + FROM person p + LEFT JOIN org o ON p.org_id = o.org_id + LEFT JOIN specialty s ON p.specialty_id = s.specialty_id + LEFT JOIN person_component pc ON p.person_id = pc.person_id + LEFT JOIN jira_component jc ON pc.component_id = jc.component_id + LEFT JOIN person_miro_team pmt ON p.person_id = pmt.person_id + LEFT JOIN miro_team mt ON pmt.miro_team_id = mt.miro_team_id + LEFT JOIN person_scrum_team pst ON p.person_id = pst.person_id + LEFT JOIN scrum_team st ON pst.team_id = st.team_id + WHERE p.manager_uid = ? + GROUP BY p.person_id + ORDER BY p.name + """ + rows = conn.execute(sql, (uid,)).fetchall() + return json.dumps( + {"manager_uid": uid, "direct_reports": _rows_to_dicts(rows), "count": len(rows)}, + default=str, + ) + + +@mcp.tool( + annotations={"readOnlyHint": True, "openWorldHint": False}, +) +def get_org_tree(uid: str, max_depth: int = 5) -> str: + """Get the full organizational tree under a person, recursively. + + Walks down through manager_uid relationships. Returns a flattened list of + all people in the org tree with their depth level. Use this to answer + 'who is in X's org' or 'list everyone under VP xyz'. + max_depth limits how deep to recurse (default 5). + """ + conn = _get_conn() + sql = """ + WITH RECURSIVE org_tree AS ( + SELECT user_id, name, job_title, email, location, manager_uid, 0 AS depth + FROM person + WHERE user_id = ? + UNION ALL + SELECT p.user_id, p.name, p.job_title, p.email, p.location, p.manager_uid, ot.depth + 1 + FROM person p + JOIN org_tree ot ON p.manager_uid = ot.user_id + WHERE ot.depth < ? + ) + SELECT user_id, name, job_title, email, location, manager_uid, depth + FROM org_tree + ORDER BY depth, name + """ + rows = conn.execute(sql, (uid, max_depth)).fetchall() + cols = ["user_id", "name", "job_title", "email", "location", "manager_uid", "depth"] + results = [dict(zip(cols, row)) for row in rows] + return json.dumps( + {"root_uid": uid, "max_depth": max_depth, "people": results, "count": len(results)}, + default=str, + ) + + +@mcp.tool( + annotations={"readOnlyHint": True, "openWorldHint": False}, +) +def get_management_chain(uid: str) -> str: + """Get the management chain from a person up to the top of the org. + + Walks up through manager_uid relationships. Returns an ordered list from + the person to the top of the org. Use this to answer 'who is the VP over X' + or 'what is the reporting chain for X'. + """ + conn = _get_conn() + sql = """ + WITH RECURSIVE chain AS ( + SELECT user_id, name, job_title, email, location, manager_uid, 0 AS level + FROM person + WHERE user_id = ? + UNION ALL + SELECT p.user_id, p.name, p.job_title, p.email, p.location, p.manager_uid, c.level + 1 + FROM person p + JOIN chain c ON c.manager_uid = p.user_id + WHERE c.manager_uid IS NOT NULL AND c.manager_uid != '' + AND c.level < 20 + ) + SELECT user_id, name, job_title, email, location, manager_uid, level + FROM chain + ORDER BY level + """ + rows = conn.execute(sql, (uid,)).fetchall() + cols = ["user_id", "name", "job_title", "email", "location", "manager_uid", "level"] + results = [dict(zip(cols, row)) for row in rows] + return json.dumps( + {"uid": uid, "chain": results, "depth": len(results)}, + default=str, + ) + + @mcp.tool( annotations={"readOnlyHint": True, "openWorldHint": False}, ) @@ -590,6 +763,437 @@ def release_risk_summary(release: str | None = None) -> str: return json.dumps({"releases": results, "assessed_on": today.isoformat()}, default=str) +# --------------------------------------------------------------------------- +# Cloud pricing tools +# --------------------------------------------------------------------------- + + +@mcp.tool( + annotations={"readOnlyHint": True, "openWorldHint": False}, +) +def cloud_pricing_lookup( + provider: str | None = None, + service: str | None = None, + instance_type: str | None = None, + region: str | None = None, + model_name: str | None = None, + usage_type: str | None = None, + limit: int = 50, +) -> str: + """Look up cloud pricing for AWS services or Claude models. + + Filters (all optional, case-insensitive partial match): + provider: 'aws' or 'anthropic' + service: 'ec2', 's3', 'ebs', 'elb', 'data_transfer', 'vertex_ai' + instance_type: e.g. 'm5.xlarge' + region: e.g. 'us-east-1' + model_name: e.g. 'claude-sonnet-4' + usage_type: 'OnDemand', 'per-token-input', 'per-token-output' + Provide at least one filter. Results capped at limit (default 50, max 200). + """ + conn = _get_conn() + if not _db_attached(conn, "pricing"): + return json.dumps({"error": "pricing.db not attached. Run: uv run scripts/fetch_pricing.py"}) + + if not any([provider, service, instance_type, region, model_name, usage_type]): + return json.dumps({"error": "Provide at least one filter"}) + + limit = min(limit, MAX_QUERY_ROWS) + conditions, params = [], [] + + if provider: + conditions.append("provider = ?") + params.append(provider.lower()) + if service: + conditions.append("service = ?") + params.append(service.lower()) + if instance_type: + conditions.append("instance_type LIKE ?") + params.append(f"%{instance_type}%") + if region: + conditions.append("region LIKE ?") + params.append(f"%{region}%") + if model_name: + conditions.append("model_name LIKE ?") + params.append(f"%{model_name}%") + if usage_type: + conditions.append("usage_type LIKE ?") + params.append(f"%{usage_type}%") + + where = " AND ".join(conditions) + sql = f""" + SELECT provider, service, region, instance_type, instance_family, + vcpu, memory_gb, storage_type, usage_type, unit, + price_per_unit, model_name, description, effective_date + FROM pricing.cloud_pricing + WHERE {where} + ORDER BY provider, service, price_per_unit + LIMIT ? + """ + params.append(limit) + rows = conn.execute(sql, params).fetchall() + return json.dumps({"pricing": _rows_to_dicts(rows), "count": len(rows)}, default=str) + + +@mcp.tool( + annotations={"readOnlyHint": True, "openWorldHint": False}, +) +def rosa_cluster_costs() -> str: + """Get estimated monthly costs for OpenShift/ROSA cluster nodes. + + Joins instance types with EC2 pricing to estimate costs. + Requires pricing.db with both AWS pricing and ROSA cluster discovery data. + """ + conn = _get_conn() + if not _db_attached(conn, "pricing"): + return json.dumps({"error": "pricing.db not attached. Run: uv run scripts/fetch_pricing.py"}) + + rows = conn.execute("SELECT * FROM pricing.v_rosa_estimated_cost ORDER BY estimated_monthly_cost DESC").fetchall() + if not rows: + return json.dumps({"error": "No ROSA cluster data. Run: uv run scripts/fetch_pricing.py (requires oc access)"}) + return json.dumps({"clusters": _rows_to_dicts(rows), "count": len(rows)}, default=str) + + +# --------------------------------------------------------------------------- +# GitHub tools +# --------------------------------------------------------------------------- + + +@mcp.tool( + annotations={"readOnlyHint": True, "openWorldHint": False}, +) +def github_org_summary() -> str: + """Get org-wide GitHub stats: repos, commits, PRs, issues, releases, contributors, LOC by language. + + Requires github.db to be built via: uv run scripts/fetch_github.py --org YOUR_ORG + """ + conn = _get_conn() + if not _db_attached(conn, "github"): + return json.dumps({"error": "github.db not attached. Run: uv run scripts/fetch_github.py --org YOUR_ORG"}) + + stats = conn.execute("SELECT * FROM github.v_gh_org_stats").fetchone() + result = dict(stats) + + # Top languages + langs = conn.execute( + """SELECT language, SUM(bytes) AS total_bytes + FROM github.gh_repo_language + GROUP BY language ORDER BY total_bytes DESC LIMIT 20""" + ).fetchall() + result["languages"] = _rows_to_dicts(langs) + + # Top repos by commits + top_repos = conn.execute( + """SELECT name, commit_count, pr_count, merged_pr_count, issue_count, + contributor_count, total_language_bytes + FROM github.v_gh_repo_summary + ORDER BY commit_count DESC LIMIT 10""" + ).fetchall() + result["top_repos"] = _rows_to_dicts(top_repos) + + return json.dumps(result, default=str) + + +@mcp.tool( + annotations={"readOnlyHint": True, "openWorldHint": False}, +) +def search_github_repos( + name: str | None = None, + language: str | None = None, + topic: str | None = None, + limit: int = 50, +) -> str: + """Search GitHub repos by name, language, or topic. All filters are partial match. + + Provide at least one parameter. Returns repo details with language breakdown. + Returns up to `limit` repos (default 50, max 200). + """ + if not any([name, language, topic]): + return json.dumps({"error": "Provide at least one of: name, language, topic"}) + + conn = _get_conn() + if not _db_attached(conn, "github"): + return json.dumps({"error": "github.db not attached. Run: uv run scripts/fetch_github.py --org YOUR_ORG"}) + + limit = min(limit, MAX_QUERY_ROWS) + conditions, params = [], [] + if name: + conditions.append("r.name LIKE ?") + params.append(f"%{name}%") + if language: + conditions.append( + "EXISTS (SELECT 1 FROM github.gh_repo_language l WHERE l.repo_id = r.repo_id AND l.language LIKE ?)" + ) + params.append(f"%{language}%") + if topic: + conditions.append( + "EXISTS (SELECT 1 FROM github.gh_repo_topic t WHERE t.repo_id = r.repo_id AND t.topic LIKE ?)" + ) + params.append(f"%{topic}%") + + where = " AND ".join(conditions) + repos = conn.execute( + f"""SELECT r.repo_id, r.name, r.description, r.html_url, r.default_branch, + r.is_fork, r.is_archived, r.stars, r.forks, r.open_issues, + r.size_kb, r.created_at, r.pushed_at, + r.commit_count, r.pr_count, r.merged_pr_count, r.issue_count, + r.contributor_count, r.topics, r.total_language_bytes + FROM github.v_gh_repo_summary r + WHERE {where} + ORDER BY r.commit_count DESC + LIMIT ?""", + [*params, limit], + ).fetchall() + + # Batch-load languages for all repos to avoid N+1 queries + repo_ids = [repo["repo_id"] for repo in repos] + langs_by_repo: dict[int, list[dict]] = {} + if repo_ids: + ph = ",".join("?" * len(repo_ids)) + lang_rows = conn.execute( + f"SELECT repo_id, language, bytes FROM github.gh_repo_language WHERE repo_id IN ({ph}) ORDER BY bytes DESC", + repo_ids, + ).fetchall() + for lr in lang_rows: + langs_by_repo.setdefault(lr["repo_id"], []).append({"language": lr["language"], "bytes": lr["bytes"]}) + + results = [] + for repo in repos: + r = dict(repo) + rid = r.pop("repo_id") + r["languages"] = langs_by_repo.get(rid, []) + results.append(r) + + return json.dumps({"repos": results, "count": len(results)}, default=str) + + +@mcp.tool( + annotations={"readOnlyHint": True, "openWorldHint": False}, +) +def search_github_commits( + repo: str | None = None, + author: str | None = None, + keyword: str | None = None, + since: str | None = None, + until: str | None = None, + limit: int = 50, +) -> str: + """Search GitHub commits with optional filters. All are case-insensitive partial match. + + Provide at least one filter. since/until are ISO dates (e.g. 2026-03-01). + """ + if not any([repo, author, keyword, since, until]): + return json.dumps({"error": "Provide at least one filter"}) + + conn = _get_conn() + if not _db_attached(conn, "github"): + return json.dumps({"error": "github.db not attached. Run: uv run scripts/fetch_github.py --org YOUR_ORG"}) + + limit = min(limit, MAX_QUERY_ROWS) + conditions, params = [], [] + + if repo: + conditions.append("r.name LIKE ?") + params.append(f"%{repo}%") + if author: + conditions.append("c.author_login LIKE ?") + params.append(f"%{author}%") + if keyword: + conditions.append("c.message LIKE ?") + params.append(f"%{keyword}%") + if since: + conditions.append("c.author_date >= ?") + params.append(since) + if until: + conditions.append("c.author_date <= ?") + params.append(until) + + where = " AND ".join(conditions) + rows = conn.execute( + f"""SELECT r.name AS repo, c.sha, c.author_login, c.author_date, c.message + FROM github.gh_commit c + JOIN github.gh_repo r ON c.repo_id = r.repo_id + WHERE {where} + ORDER BY c.author_date DESC + LIMIT ?""", + [*params, limit], + ).fetchall() + return json.dumps({"commits": _rows_to_dicts(rows), "count": len(rows)}, default=str) + + +@mcp.tool( + annotations={"readOnlyHint": True, "openWorldHint": False}, +) +def search_github_prs( + repo: str | None = None, + state: str | None = None, + author: str | None = None, + keyword: str | None = None, + limit: int = 50, +) -> str: + """Search GitHub pull requests. state can be: open, closed, merged (merged_at IS NOT NULL). + + Provide at least one filter. All text filters are partial match. + """ + if not any([repo, state, author, keyword]): + return json.dumps({"error": "Provide at least one filter"}) + + conn = _get_conn() + if not _db_attached(conn, "github"): + return json.dumps({"error": "github.db not attached. Run: uv run scripts/fetch_github.py --org YOUR_ORG"}) + + limit = min(limit, MAX_QUERY_ROWS) + conditions, params = [], [] + + if repo: + conditions.append("r.name LIKE ?") + params.append(f"%{repo}%") + if state: + if state.lower() == "merged": + conditions.append("p.merged_at IS NOT NULL") + else: + conditions.append("p.state LIKE ?") + params.append(f"%{state}%") + if author: + conditions.append("p.author_login LIKE ?") + params.append(f"%{author}%") + if keyword: + conditions.append("p.title LIKE ?") + params.append(f"%{keyword}%") + + where = " AND ".join(conditions) + rows = conn.execute( + f"""SELECT r.name AS repo, p.number, p.title, p.state, p.author_login, + p.created_at, p.updated_at, p.merged_at, + p.additions, p.deletions, p.changed_files, p.html_url, + (SELECT GROUP_CONCAT(l.label, ', ') + FROM github.gh_pr_label l WHERE l.pr_id = p.pr_id) AS labels, + (SELECT COUNT(*) FROM github.gh_pr_review rv + WHERE rv.pr_id = p.pr_id AND rv.state = 'APPROVED') AS approvals + FROM github.gh_pull_request p + JOIN github.gh_repo r ON p.repo_id = r.repo_id + WHERE {where} + ORDER BY p.updated_at DESC + LIMIT ?""", + [*params, limit], + ).fetchall() + return json.dumps({"prs": _rows_to_dicts(rows), "count": len(rows)}, default=str) + + +@mcp.tool( + annotations={"readOnlyHint": True, "openWorldHint": False}, +) +def search_github_issues( + repo: str | None = None, + state: str | None = None, + author: str | None = None, + keyword: str | None = None, + limit: int = 50, +) -> str: + """Search GitHub issues (not PRs). All filters are case-insensitive partial match. + + Provide at least one filter. + """ + if not any([repo, state, author, keyword]): + return json.dumps({"error": "Provide at least one filter"}) + + conn = _get_conn() + if not _db_attached(conn, "github"): + return json.dumps({"error": "github.db not attached. Run: uv run scripts/fetch_github.py --org YOUR_ORG"}) + + limit = min(limit, MAX_QUERY_ROWS) + conditions, params = [], [] + + if repo: + conditions.append("r.name LIKE ?") + params.append(f"%{repo}%") + if state: + conditions.append("i.state LIKE ?") + params.append(f"%{state}%") + if author: + conditions.append("i.author_login LIKE ?") + params.append(f"%{author}%") + if keyword: + conditions.append("i.title LIKE ?") + params.append(f"%{keyword}%") + + where = " AND ".join(conditions) + rows = conn.execute( + f"""SELECT r.name AS repo, i.number, i.title, i.state, i.author_login, + i.created_at, i.updated_at, i.closed_at, i.html_url, + (SELECT GROUP_CONCAT(l.label, ', ') + FROM github.gh_issue_label l WHERE l.issue_id = i.issue_id) AS labels + FROM github.gh_issue i + JOIN github.gh_repo r ON i.repo_id = r.repo_id + WHERE {where} + ORDER BY i.updated_at DESC + LIMIT ?""", + [*params, limit], + ).fetchall() + return json.dumps({"issues": _rows_to_dicts(rows), "count": len(rows)}, default=str) + + +@mcp.tool( + annotations={"readOnlyHint": True, "openWorldHint": False}, +) +def github_code_stats(repo: str | None = None) -> str: + """Get LOC stats by language and weekly code frequency. Per-repo or org-wide. + + Without repo parameter, returns org-wide totals. + """ + conn = _get_conn() + if not _db_attached(conn, "github"): + return json.dumps({"error": "github.db not attached. Run: uv run scripts/fetch_github.py --org YOUR_ORG"}) + + result: dict = {} + + if repo: + repo_row = conn.execute( + "SELECT repo_id, name FROM github.gh_repo WHERE name LIKE ?", + (f"%{repo}%",), + ).fetchone() + if not repo_row: + return json.dumps({"error": f"No repo matching '{repo}'"}) + + repo_id = repo_row["repo_id"] + result["repo"] = repo_row["name"] + + langs = conn.execute( + "SELECT language, bytes FROM github.gh_repo_language WHERE repo_id = ? ORDER BY bytes DESC", + (repo_id,), + ).fetchall() + result["languages"] = _rows_to_dicts(langs) + result["total_bytes"] = sum(r["bytes"] for r in langs) + + freq = conn.execute( + """SELECT week_ts, additions, deletions FROM github.gh_code_frequency + WHERE repo_id = ? ORDER BY week_ts DESC LIMIT 52""", + (repo_id,), + ).fetchall() + result["weekly_frequency"] = _rows_to_dicts(freq) + result["total_additions"] = sum(r["additions"] for r in freq) + result["total_deletions"] = sum(abs(r["deletions"]) for r in freq) + else: + langs = conn.execute( + """SELECT language, SUM(bytes) AS total_bytes, COUNT(DISTINCT repo_id) AS repo_count + FROM github.gh_repo_language + GROUP BY language ORDER BY total_bytes DESC""" + ).fetchall() + result["languages"] = _rows_to_dicts(langs) + result["total_bytes"] = sum(r["total_bytes"] for r in langs) + + freq = conn.execute( + """SELECT week_ts, SUM(additions) AS additions, SUM(deletions) AS deletions + FROM github.gh_code_frequency + GROUP BY week_ts ORDER BY week_ts DESC LIMIT 52""" + ).fetchall() + result["weekly_frequency"] = _rows_to_dicts(freq) + result["total_additions"] = sum(r["additions"] for r in freq) + result["total_deletions"] = sum(abs(r["deletions"]) for r in freq) + + return json.dumps(result, default=str) + + # --------------------------------------------------------------------------- # Governance tools # --------------------------------------------------------------------------- @@ -699,6 +1303,20 @@ def get_gps_version() -> str: return json.dumps(result, default=str) +# --------------------------------------------------------------------------- +# Local extensions (not tracked upstream) +# --------------------------------------------------------------------------- + +_ext_dir = Path(__file__).resolve().parent / "extensions" +if _ext_dir.is_dir(): + for _ext in sorted(_ext_dir.glob("*.py")): + if _ext.name.startswith("_"): + continue + try: + importlib.import_module(f"extensions.{_ext.stem}") + except Exception as _e: + logging.getLogger(__name__).warning("Failed to load extension %s: %s", _ext.name, _e) + # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- diff --git a/scripts/check-no-secrets.sh b/scripts/check-no-secrets.sh new file mode 100755 index 0000000..c1adce0 --- /dev/null +++ b/scripts/check-no-secrets.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash +# Pre-commit hook: check staged files for possible secrets. +# +# Scans for common secret patterns: API tokens, passwords, bearer tokens, +# client secrets, private keys, and connection strings. + +found=0 +for f in "$@"; do + if grep -qE '(API_TOKEN|API_SECRET|API_KEY|PASSWORD|PRIVATE_KEY|Bearer |client_secret|-----BEGIN .* KEY-----)' "$f" 2>/dev/null; then + echo " $f" + found=1 + fi +done + +if [ "$found" = "1" ]; then + echo "BLOCKED: Possible secret detected in file(s) above" + exit 1 +fi +exit 0 diff --git a/scripts/check-protected-files.sh b/scripts/check-protected-files.sh new file mode 100755 index 0000000..4016c24 --- /dev/null +++ b/scripts/check-protected-files.sh @@ -0,0 +1,38 @@ +#!/usr/bin/env bash +# Pre-commit hook: block changes to protected files. +# +# Protected paths are defined in the PROTECTED_PATHS variable below. +# Customize for your project. +# +# To bypass for intentional changes, use: +# ALLOW_PROTECTED=1 git commit -m "description" + +if [ "${ALLOW_PROTECTED}" = "1" ]; then + exit 0 +fi + +# Define protected file patterns (space-separated globs). +# Override via PROTECTED_PATHS env var if needed. +PROTECTED_PATHS="${PROTECTED_PATHS:-governance/}" + +changed=$(git diff --cached --name-only) +blocked="" + +for pattern in $PROTECTED_PATHS; do + for file in $changed; do + case "$file" in + $pattern*) + blocked="$blocked $file" + ;; + esac + done +done + +if [ -n "$blocked" ]; then + echo "BLOCKED: Protected files modified:$blocked" + echo "" + echo "To commit intentional changes:" + echo " ALLOW_PROTECTED=1 git commit -m 'description of change'" + exit 1 +fi +exit 0 diff --git a/scripts/fetch_github.py b/scripts/fetch_github.py new file mode 100644 index 0000000..b4b9e18 --- /dev/null +++ b/scripts/fetch_github.py @@ -0,0 +1,1026 @@ +#!/usr/bin/env python3 +# /// script +# requires-python = ">=3.11" +# dependencies = [] +# /// +"""Fetch GitHub org data into data/github.db. + +Builds a separate SQLite database with repos, commits, PRs, issues, releases, +contributors, and code frequency for a specified GitHub org. +Uses `gh api` via subprocess (must be authenticated). + +Usage: + uv run scripts/fetch_github.py --org MY_ORG # incremental from _meta + uv run scripts/fetch_github.py --org MY_ORG --repo my-repo # single repo + uv run scripts/fetch_github.py --org MY_ORG --since 2026-03-01 + uv run scripts/fetch_github.py --org MY_ORG --full # ignore _meta, fetch all + uv run scripts/fetch_github.py --org MY_ORG --skip-commits # fast metadata-only refresh +""" + +import argparse +import json +import sqlite3 +import subprocess +import sys +import time +from datetime import datetime, timezone +from pathlib import Path + +SCRIPT_DIR = Path(__file__).resolve().parent +REPO_ROOT = SCRIPT_DIR.parent +DATA_DIR = REPO_ROOT / "data" +DB_PATH = DATA_DIR / "github.db" + +API_DELAY = 0.5 +SLOW_DELAY = 2.0 + +# Track rate limit state from last API call +_rate_limit_remaining: int | None = None +_rate_limit_reset: float | None = None + +# --------------------------------------------------------------------------- +# GitHub API via `gh` +# --------------------------------------------------------------------------- + + +def gh_api( + endpoint: str, + paginate: bool = False, + jq: str | None = None, + method: str = "GET", +) -> dict | list | None: + """Call GitHub API via `gh api`. Returns parsed JSON or None on error.""" + global _rate_limit_remaining, _rate_limit_reset + + # Check rate limit before calling + if _rate_limit_remaining is not None and _rate_limit_remaining < 3: + if _rate_limit_reset: + wait = max(0, _rate_limit_reset - time.time()) + 1 + print(f" Rate limit nearly exhausted, sleeping {wait:.0f}s...") + time.sleep(wait) + + delay = API_DELAY + if _rate_limit_remaining is not None and _rate_limit_remaining < 10: + delay = SLOW_DELAY + + time.sleep(delay) + + cmd = ["gh", "api"] + if paginate: + cmd.append("--paginate") + if jq: + cmd.extend(["--jq", jq]) + if method != "GET": + cmd.extend(["--method", method]) + # Include response headers so we can parse rate limit info + cmd.extend(["--include", endpoint]) + + for attempt in range(3): + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) + except subprocess.TimeoutExpired: + print(f" Timeout on {endpoint}, attempt {attempt + 1}/3") + time.sleep(2 ** (attempt + 1)) + continue + + # Parse headers and body from --include output + output = result.stdout + headers_str = "" + body_str = output + + # --include puts headers before blank line then body + if "\r\n\r\n" in output: + headers_str, body_str = output.split("\r\n\r\n", 1) + elif "\n\n" in output: + headers_str, body_str = output.split("\n\n", 1) + + # Parse rate limit headers + for line in headers_str.splitlines(): + if line.lower().startswith("x-ratelimit-remaining:"): + try: + _rate_limit_remaining = int(line.split(":", 1)[1].strip()) + except ValueError: + pass + elif line.lower().startswith("x-ratelimit-reset:"): + try: + _rate_limit_reset = float(line.split(":", 1)[1].strip()) + except ValueError: + pass + + if result.returncode != 0: + stderr = result.stderr.strip() + # Check for rate limiting (403) + if "403" in stderr or "rate limit" in stderr.lower(): + if _rate_limit_reset: + wait = max(0, _rate_limit_reset - time.time()) + 1 + print(f" Rate limited on {endpoint}, sleeping {wait:.0f}s...") + time.sleep(wait) + continue + time.sleep(2 ** (attempt + 1)) + continue + # 5xx or transient error + if any(str(c) in stderr for c in range(500, 600)): + time.sleep(2 ** (attempt + 1)) + continue + print(f" gh api error on {endpoint}: {stderr}", file=sys.stderr) + return None + + body_str = body_str.strip() + if not body_str: + return None + + try: + return json.loads(body_str) + except json.JSONDecodeError: + print(f" JSON decode error on {endpoint}", file=sys.stderr) + return None + + print(f" Exhausted retries on {endpoint}", file=sys.stderr) + return None + + +def gh_api_simple(endpoint: str, paginate: bool = False) -> dict | list | None: + """Simpler gh api call without --include (for paginated calls where --include breaks).""" + global _rate_limit_remaining, _rate_limit_reset + + delay = API_DELAY + if _rate_limit_remaining is not None and _rate_limit_remaining < 10: + delay = SLOW_DELAY + if _rate_limit_remaining is not None and _rate_limit_remaining < 3: + if _rate_limit_reset: + wait = max(0, _rate_limit_reset - time.time()) + 1 + print(f" Rate limit nearly exhausted, sleeping {wait:.0f}s...") + time.sleep(wait) + + time.sleep(delay) + + cmd = ["gh", "api"] + if paginate: + cmd.append("--paginate") + cmd.append(endpoint) + + for attempt in range(3): + try: + result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) + except subprocess.TimeoutExpired: + print(f" Timeout on {endpoint}, attempt {attempt + 1}/3") + time.sleep(2 ** (attempt + 1)) + continue + + if result.returncode != 0: + stderr = result.stderr.strip() + if "403" in stderr or "rate limit" in stderr.lower(): + time.sleep(60) + continue + if any(str(c) in stderr for c in range(500, 600)): + time.sleep(2 ** (attempt + 1)) + continue + print(f" gh api error on {endpoint}: {stderr}", file=sys.stderr) + return None + + body = result.stdout.strip() + if not body: + return None + + # Paginated output may be multiple JSON arrays concatenated + if paginate and body.startswith("["): + # gh --paginate concatenates arrays as ][ + # or sometimes just newline-separated arrays + body = body.replace("]\n[", ",").replace("][", ",") + + try: + return json.loads(body) + except json.JSONDecodeError: + print(f" JSON decode error on {endpoint}", file=sys.stderr) + return None + + print(f" Exhausted retries on {endpoint}", file=sys.stderr) + return None + + +# --------------------------------------------------------------------------- +# Schema +# --------------------------------------------------------------------------- + +SCHEMA_SQL = """\ +CREATE TABLE IF NOT EXISTS gh_user ( + user_id INTEGER PRIMARY KEY, + login TEXT NOT NULL UNIQUE, + name TEXT, + avatar_url TEXT, + html_url TEXT, + user_type TEXT, + fetched_at TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS gh_repo ( + repo_id INTEGER PRIMARY KEY, + name TEXT NOT NULL UNIQUE, + full_name TEXT, + description TEXT, + html_url TEXT, + default_branch TEXT, + is_fork INTEGER DEFAULT 0, + is_archived INTEGER DEFAULT 0, + is_private INTEGER DEFAULT 0, + created_at TEXT, + updated_at TEXT, + pushed_at TEXT, + stars INTEGER DEFAULT 0, + forks INTEGER DEFAULT 0, + open_issues INTEGER DEFAULT 0, + size_kb INTEGER DEFAULT 0, + fetched_at TEXT NOT NULL +); + +CREATE TABLE IF NOT EXISTS gh_repo_topic ( + repo_id INTEGER NOT NULL, + topic TEXT NOT NULL, + PRIMARY KEY (repo_id, topic), + FOREIGN KEY (repo_id) REFERENCES gh_repo(repo_id) +); + +CREATE TABLE IF NOT EXISTS gh_repo_language ( + repo_id INTEGER NOT NULL, + language TEXT NOT NULL, + bytes INTEGER DEFAULT 0, + PRIMARY KEY (repo_id, language), + FOREIGN KEY (repo_id) REFERENCES gh_repo(repo_id) +); + +CREATE TABLE IF NOT EXISTS gh_contributor ( + repo_id INTEGER NOT NULL, + user_id INTEGER NOT NULL, + contributions INTEGER DEFAULT 0, + PRIMARY KEY (repo_id, user_id), + FOREIGN KEY (repo_id) REFERENCES gh_repo(repo_id), + FOREIGN KEY (user_id) REFERENCES gh_user(user_id) +); + +CREATE TABLE IF NOT EXISTS gh_commit ( + repo_id INTEGER NOT NULL, + sha TEXT NOT NULL, + author_login TEXT, + author_date TEXT, + message TEXT, + additions INTEGER, + deletions INTEGER, + fetched_at TEXT NOT NULL, + PRIMARY KEY (repo_id, sha), + FOREIGN KEY (repo_id) REFERENCES gh_repo(repo_id) +); + +CREATE TABLE IF NOT EXISTS gh_pull_request ( + pr_id INTEGER PRIMARY KEY, + repo_id INTEGER NOT NULL, + number INTEGER NOT NULL, + title TEXT, + state TEXT, + author_login TEXT, + created_at TEXT, + updated_at TEXT, + closed_at TEXT, + merged_at TEXT, + additions INTEGER, + deletions INTEGER, + changed_files INTEGER, + html_url TEXT, + fetched_at TEXT NOT NULL, + UNIQUE(repo_id, number), + FOREIGN KEY (repo_id) REFERENCES gh_repo(repo_id) +); + +CREATE TABLE IF NOT EXISTS gh_pr_label ( + pr_id INTEGER NOT NULL, + label TEXT NOT NULL, + PRIMARY KEY (pr_id, label), + FOREIGN KEY (pr_id) REFERENCES gh_pull_request(pr_id) +); + +CREATE TABLE IF NOT EXISTS gh_pr_assignee ( + pr_id INTEGER NOT NULL, + login TEXT NOT NULL, + PRIMARY KEY (pr_id, login), + FOREIGN KEY (pr_id) REFERENCES gh_pull_request(pr_id) +); + +CREATE TABLE IF NOT EXISTS gh_pr_reviewer ( + pr_id INTEGER NOT NULL, + login TEXT NOT NULL, + PRIMARY KEY (pr_id, login), + FOREIGN KEY (pr_id) REFERENCES gh_pull_request(pr_id) +); + +CREATE TABLE IF NOT EXISTS gh_pr_review ( + review_id INTEGER PRIMARY KEY, + pr_id INTEGER NOT NULL, + user_login TEXT, + state TEXT, + submitted_at TEXT, + FOREIGN KEY (pr_id) REFERENCES gh_pull_request(pr_id) +); + +CREATE TABLE IF NOT EXISTS gh_issue ( + issue_id INTEGER PRIMARY KEY, + repo_id INTEGER NOT NULL, + number INTEGER NOT NULL, + title TEXT, + state TEXT, + author_login TEXT, + created_at TEXT, + updated_at TEXT, + closed_at TEXT, + html_url TEXT, + fetched_at TEXT NOT NULL, + UNIQUE(repo_id, number), + FOREIGN KEY (repo_id) REFERENCES gh_repo(repo_id) +); + +CREATE TABLE IF NOT EXISTS gh_issue_label ( + issue_id INTEGER NOT NULL, + label TEXT NOT NULL, + PRIMARY KEY (issue_id, label), + FOREIGN KEY (issue_id) REFERENCES gh_issue(issue_id) +); + +CREATE TABLE IF NOT EXISTS gh_issue_assignee ( + issue_id INTEGER NOT NULL, + login TEXT NOT NULL, + PRIMARY KEY (issue_id, login), + FOREIGN KEY (issue_id) REFERENCES gh_issue(issue_id) +); + +CREATE TABLE IF NOT EXISTS gh_release ( + release_id INTEGER PRIMARY KEY, + repo_id INTEGER NOT NULL, + tag_name TEXT NOT NULL, + name TEXT, + draft INTEGER DEFAULT 0, + prerelease INTEGER DEFAULT 0, + created_at TEXT, + published_at TEXT, + author_login TEXT, + html_url TEXT, + fetched_at TEXT NOT NULL, + UNIQUE(repo_id, tag_name), + FOREIGN KEY (repo_id) REFERENCES gh_repo(repo_id) +); + +CREATE TABLE IF NOT EXISTS gh_release_asset ( + asset_id INTEGER PRIMARY KEY, + release_id INTEGER NOT NULL, + name TEXT, + size_bytes INTEGER DEFAULT 0, + download_count INTEGER DEFAULT 0, + content_type TEXT, + FOREIGN KEY (release_id) REFERENCES gh_release(release_id) +); + +CREATE TABLE IF NOT EXISTS gh_code_frequency ( + repo_id INTEGER NOT NULL, + week_ts INTEGER NOT NULL, + additions INTEGER DEFAULT 0, + deletions INTEGER DEFAULT 0, + PRIMARY KEY (repo_id, week_ts), + FOREIGN KEY (repo_id) REFERENCES gh_repo(repo_id) +); + +CREATE TABLE IF NOT EXISTS _meta ( + key TEXT PRIMARY KEY, + value TEXT +); + +-- Views + +CREATE VIEW IF NOT EXISTS v_gh_repo_summary AS +SELECT + r.repo_id, r.name, r.description, r.html_url, r.default_branch, + r.is_fork, r.is_archived, r.stars, r.forks, r.open_issues, r.size_kb, + r.created_at, r.updated_at, r.pushed_at, + (SELECT COUNT(*) FROM gh_commit c WHERE c.repo_id = r.repo_id) AS commit_count, + (SELECT COUNT(*) FROM gh_pull_request p WHERE p.repo_id = r.repo_id) AS pr_count, + (SELECT COUNT(*) FROM gh_pull_request p WHERE p.repo_id = r.repo_id AND p.merged_at IS NOT NULL) AS merged_pr_count, + (SELECT COUNT(*) FROM gh_issue i WHERE i.repo_id = r.repo_id) AS issue_count, + (SELECT COUNT(DISTINCT c2.user_id) FROM gh_contributor c2 WHERE c2.repo_id = r.repo_id) AS contributor_count, + (SELECT GROUP_CONCAT(t.topic, ', ') FROM gh_repo_topic t WHERE t.repo_id = r.repo_id) AS topics, + (SELECT SUM(l.bytes) FROM gh_repo_language l WHERE l.repo_id = r.repo_id) AS total_language_bytes +FROM gh_repo r; + +CREATE VIEW IF NOT EXISTS v_gh_org_stats AS +SELECT + (SELECT COUNT(*) FROM gh_repo) AS total_repos, + (SELECT COUNT(*) FROM gh_repo WHERE is_archived = 0) AS active_repos, + (SELECT COUNT(*) FROM gh_commit) AS total_commits, + (SELECT COUNT(*) FROM gh_pull_request) AS total_prs, + (SELECT COUNT(*) FROM gh_pull_request WHERE merged_at IS NOT NULL) AS merged_prs, + (SELECT COUNT(*) FROM gh_issue) AS total_issues, + (SELECT COUNT(*) FROM gh_release) AS total_releases, + (SELECT COUNT(DISTINCT login) FROM gh_user) AS total_contributors, + (SELECT SUM(bytes) FROM gh_repo_language) AS total_language_bytes, + (SELECT SUM(additions) FROM gh_code_frequency) AS total_additions, + (SELECT SUM(ABS(deletions)) FROM gh_code_frequency) AS total_deletions; + +CREATE VIEW IF NOT EXISTS v_gh_contributor_summary AS +SELECT + u.login, u.name, u.user_id, + COUNT(DISTINCT c.repo_id) AS repos_contributed, + SUM(c.contributions) AS total_contributions, + (SELECT COUNT(*) FROM gh_pull_request p WHERE p.author_login = u.login) AS prs_authored, + (SELECT COUNT(*) FROM gh_pull_request p WHERE p.author_login = u.login AND p.merged_at IS NOT NULL) AS prs_merged, + (SELECT COUNT(*) FROM gh_pr_review rv WHERE rv.user_login = u.login) AS reviews_given, + (SELECT COUNT(*) FROM gh_commit cm WHERE cm.author_login = u.login) AS commits_authored +FROM gh_user u +JOIN gh_contributor c ON u.user_id = c.user_id +GROUP BY u.user_id; + +-- Indexes + +CREATE INDEX IF NOT EXISTS idx_gh_commit_repo_date ON gh_commit(repo_id, author_date DESC); +CREATE INDEX IF NOT EXISTS idx_gh_commit_author ON gh_commit(author_login); +CREATE INDEX IF NOT EXISTS idx_gh_commit_message ON gh_commit(message); +CREATE INDEX IF NOT EXISTS idx_gh_pr_repo_state ON gh_pull_request(repo_id, state); +CREATE INDEX IF NOT EXISTS idx_gh_pr_author ON gh_pull_request(author_login); +CREATE INDEX IF NOT EXISTS idx_gh_pr_merged ON gh_pull_request(merged_at) WHERE merged_at IS NOT NULL; +CREATE INDEX IF NOT EXISTS idx_gh_pr_updated ON gh_pull_request(updated_at DESC); +CREATE INDEX IF NOT EXISTS idx_gh_pr_created ON gh_pull_request(created_at DESC); +CREATE INDEX IF NOT EXISTS idx_gh_issue_repo_state ON gh_issue(repo_id, state); +CREATE INDEX IF NOT EXISTS idx_gh_issue_author ON gh_issue(author_login); +CREATE INDEX IF NOT EXISTS idx_gh_issue_updated ON gh_issue(updated_at DESC); +CREATE INDEX IF NOT EXISTS idx_gh_release_repo ON gh_release(repo_id); +CREATE INDEX IF NOT EXISTS idx_gh_code_freq_repo ON gh_code_frequency(repo_id); +CREATE INDEX IF NOT EXISTS idx_gh_contributor_repo ON gh_contributor(repo_id); +CREATE INDEX IF NOT EXISTS idx_gh_contributor_user ON gh_contributor(user_id); +CREATE INDEX IF NOT EXISTS idx_gh_pr_review_pr ON gh_pr_review(pr_id); +CREATE INDEX IF NOT EXISTS idx_gh_pr_review_user ON gh_pr_review(user_login); +CREATE INDEX IF NOT EXISTS idx_gh_user_login ON gh_user(login); +""" + + +def init_db() -> sqlite3.Connection: + """Create github.db and apply schema.""" + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode = WAL") + conn.executescript(SCHEMA_SQL) + return conn + + +# --------------------------------------------------------------------------- +# Fetch functions +# --------------------------------------------------------------------------- + + +def upsert_user(conn: sqlite3.Connection, user: dict, now: str) -> int | None: + """Upsert a GitHub user. Returns user_id or None.""" + if not user or not user.get("id"): + return None + conn.execute( + """INSERT INTO gh_user (user_id, login, name, avatar_url, html_url, user_type, fetched_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(user_id) DO UPDATE SET + login=excluded.login, name=excluded.name, + avatar_url=excluded.avatar_url, html_url=excluded.html_url, + user_type=excluded.user_type, fetched_at=excluded.fetched_at""", + ( + user["id"], + user.get("login", ""), + user.get("name"), + user.get("avatar_url"), + user.get("html_url"), + user.get("type", "User"), + now, + ), + ) + return user["id"] + + +def fetch_repos(conn: sqlite3.Connection, org: str, now: str) -> list[dict]: + """Fetch all repos in the org.""" + print(f"\nFetching repos for {org}...") + repos = gh_api_simple(f"/orgs/{org}/repos?per_page=100&type=all", paginate=True) + if not repos: + print(" No repos found or API error") + return [] + + for r in repos: + conn.execute( + """INSERT INTO gh_repo (repo_id, name, full_name, description, html_url, + default_branch, is_fork, is_archived, is_private, + created_at, updated_at, pushed_at, stars, forks, + open_issues, size_kb, fetched_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(repo_id) DO UPDATE SET + name=excluded.name, full_name=excluded.full_name, + description=excluded.description, html_url=excluded.html_url, + default_branch=excluded.default_branch, + is_fork=excluded.is_fork, is_archived=excluded.is_archived, + is_private=excluded.is_private, + updated_at=excluded.updated_at, pushed_at=excluded.pushed_at, + stars=excluded.stars, forks=excluded.forks, + open_issues=excluded.open_issues, size_kb=excluded.size_kb, + fetched_at=excluded.fetched_at""", + ( + r["id"], + r["name"], + r.get("full_name", ""), + r.get("description"), + r.get("html_url"), + r.get("default_branch", "main"), + 1 if r.get("fork") else 0, + 1 if r.get("archived") else 0, + 1 if r.get("private") else 0, + r.get("created_at"), + r.get("updated_at"), + r.get("pushed_at"), + r.get("stargazers_count", 0), + r.get("forks_count", 0), + r.get("open_issues_count", 0), + r.get("size", 0), + now, + ), + ) + # Topics + conn.execute("DELETE FROM gh_repo_topic WHERE repo_id = ?", (r["id"],)) + for topic in r.get("topics", []): + conn.execute( + "INSERT OR IGNORE INTO gh_repo_topic (repo_id, topic) VALUES (?, ?)", + (r["id"], topic), + ) + + print(f" {len(repos)} repos") + return repos + + +def fetch_languages(conn: sqlite3.Connection, repo_id: int, name: str, org: str, now: str) -> None: + """Fetch language breakdown for a repo.""" + langs = gh_api_simple(f"/repos/{org}/{name}/languages") + if not langs or not isinstance(langs, dict): + return + conn.execute("DELETE FROM gh_repo_language WHERE repo_id = ?", (repo_id,)) + for lang, byte_count in langs.items(): + conn.execute( + "INSERT OR REPLACE INTO gh_repo_language (repo_id, language, bytes) VALUES (?, ?, ?)", + (repo_id, lang, byte_count), + ) + + +def fetch_contributors(conn: sqlite3.Connection, repo_id: int, name: str, org: str, now: str) -> None: + """Fetch contributors for a repo.""" + contribs = gh_api_simple(f"/repos/{org}/{name}/contributors?per_page=100", paginate=True) + if not contribs or not isinstance(contribs, list): + return + for c in contribs: + if not c.get("id"): + continue + upsert_user(conn, c, now) + conn.execute( + """INSERT INTO gh_contributor (repo_id, user_id, contributions) + VALUES (?, ?, ?) + ON CONFLICT(repo_id, user_id) DO UPDATE SET + contributions=excluded.contributions""", + (repo_id, c["id"], c.get("contributions", 0)), + ) + + +def fetch_commits( + conn: sqlite3.Connection, + repo_id: int, + name: str, + org: str, + now: str, + since: str | None = None, +) -> None: + """Fetch commits for a repo.""" + endpoint = f"/repos/{org}/{name}/commits?per_page=100" + if since: + endpoint += f"&since={since}" + + commits = gh_api_simple(endpoint, paginate=True) + if not commits or not isinstance(commits, list): + return + + print(f" {len(commits)} commits") + for c in commits: + author = c.get("author") or {} + commit_data = c.get("commit", {}) + author_info = commit_data.get("author", {}) + conn.execute( + """INSERT INTO gh_commit (repo_id, sha, author_login, author_date, message, + additions, deletions, fetched_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(repo_id, sha) DO UPDATE SET + author_login=excluded.author_login, message=excluded.message, + fetched_at=excluded.fetched_at""", + ( + repo_id, + c["sha"], + author.get("login"), + author_info.get("date"), + commit_data.get("message", "")[:500], + None, # additions not available in list endpoint + None, + now, + ), + ) + + +def fetch_prs( + conn: sqlite3.Connection, + repo_id: int, + name: str, + org: str, + now: str, + since: str | None = None, +) -> None: + """Fetch pull requests for a repo.""" + endpoint = f"/repos/{org}/{name}/pulls?per_page=100&state=all&sort=updated&direction=desc" + prs = gh_api_simple(endpoint, paginate=True) + if not prs or not isinstance(prs, list): + return + + fetched = 0 + for pr in prs: + # Incremental: stop if PR is older than since + if since and pr.get("updated_at", "") < since: + break + + pr_id = pr["id"] + conn.execute( + """INSERT INTO gh_pull_request (pr_id, repo_id, number, title, state, + author_login, created_at, updated_at, closed_at, merged_at, + additions, deletions, changed_files, html_url, fetched_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(pr_id) DO UPDATE SET + title=excluded.title, state=excluded.state, + updated_at=excluded.updated_at, closed_at=excluded.closed_at, + merged_at=excluded.merged_at, additions=excluded.additions, + deletions=excluded.deletions, changed_files=excluded.changed_files, + fetched_at=excluded.fetched_at""", + ( + pr_id, + repo_id, + pr["number"], + pr.get("title"), + pr.get("state"), + (pr.get("user") or {}).get("login"), + pr.get("created_at"), + pr.get("updated_at"), + pr.get("closed_at"), + pr.get("merged_at"), + pr.get("additions"), + pr.get("deletions"), + pr.get("changed_files"), + pr.get("html_url"), + now, + ), + ) + + # Labels + conn.execute("DELETE FROM gh_pr_label WHERE pr_id = ?", (pr_id,)) + for lbl in pr.get("labels", []): + conn.execute( + "INSERT OR IGNORE INTO gh_pr_label (pr_id, label) VALUES (?, ?)", + (pr_id, lbl.get("name", "")), + ) + + # Assignees + conn.execute("DELETE FROM gh_pr_assignee WHERE pr_id = ?", (pr_id,)) + for a in pr.get("assignees", []): + conn.execute( + "INSERT OR IGNORE INTO gh_pr_assignee (pr_id, login) VALUES (?, ?)", + (pr_id, a.get("login", "")), + ) + + # Requested reviewers + conn.execute("DELETE FROM gh_pr_reviewer WHERE pr_id = ?", (pr_id,)) + for rv in pr.get("requested_reviewers", []): + conn.execute( + "INSERT OR IGNORE INTO gh_pr_reviewer (pr_id, login) VALUES (?, ?)", + (pr_id, rv.get("login", "")), + ) + fetched += 1 + + print(f" {fetched} PRs") + + +def fetch_pr_reviews(conn: sqlite3.Connection, repo_id: int, name: str, org: str, now: str) -> None: + """Fetch reviews for all PRs in a repo that don't have reviews yet.""" + # Get PRs that have no reviews stored + prs = conn.execute( + """SELECT pr_id, number FROM gh_pull_request + WHERE repo_id = ? AND pr_id NOT IN (SELECT DISTINCT pr_id FROM gh_pr_review)""", + (repo_id,), + ).fetchall() + if not prs: + return + + review_count = 0 + for pr in prs: + reviews = gh_api_simple(f"/repos/{org}/{name}/pulls/{pr['number']}/reviews?per_page=100") + if not reviews or not isinstance(reviews, list): + continue + for rv in reviews: + conn.execute( + """INSERT OR REPLACE INTO gh_pr_review + (review_id, pr_id, user_login, state, submitted_at) + VALUES (?, ?, ?, ?, ?)""", + ( + rv["id"], + pr["pr_id"], + (rv.get("user") or {}).get("login"), + rv.get("state"), + rv.get("submitted_at"), + ), + ) + review_count += 1 + if review_count: + print(f" {review_count} reviews") + + +def fetch_issues( + conn: sqlite3.Connection, + repo_id: int, + name: str, + org: str, + now: str, + since: str | None = None, +) -> None: + """Fetch issues (excluding PRs) for a repo.""" + endpoint = f"/repos/{org}/{name}/issues?per_page=100&state=all&sort=updated&direction=desc" + if since: + endpoint += f"&since={since}" + + issues = gh_api_simple(endpoint, paginate=True) + if not issues or not isinstance(issues, list): + return + + fetched = 0 + for issue in issues: + # Skip pull requests (GitHub includes them in issues endpoint) + if "pull_request" in issue: + continue + + issue_id = issue["id"] + conn.execute( + """INSERT INTO gh_issue (issue_id, repo_id, number, title, state, + author_login, created_at, updated_at, closed_at, html_url, fetched_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(issue_id) DO UPDATE SET + title=excluded.title, state=excluded.state, + updated_at=excluded.updated_at, closed_at=excluded.closed_at, + fetched_at=excluded.fetched_at""", + ( + issue_id, + repo_id, + issue["number"], + issue.get("title"), + issue.get("state"), + (issue.get("user") or {}).get("login"), + issue.get("created_at"), + issue.get("updated_at"), + issue.get("closed_at"), + issue.get("html_url"), + now, + ), + ) + + # Labels + conn.execute("DELETE FROM gh_issue_label WHERE issue_id = ?", (issue_id,)) + for lbl in issue.get("labels", []): + conn.execute( + "INSERT OR IGNORE INTO gh_issue_label (issue_id, label) VALUES (?, ?)", + (issue_id, lbl.get("name", "")), + ) + + # Assignees + conn.execute("DELETE FROM gh_issue_assignee WHERE issue_id = ?", (issue_id,)) + for a in issue.get("assignees", []): + conn.execute( + "INSERT OR IGNORE INTO gh_issue_assignee (issue_id, login) VALUES (?, ?)", + (issue_id, a.get("login", "")), + ) + fetched += 1 + + print(f" {fetched} issues") + + +def fetch_releases(conn: sqlite3.Connection, repo_id: int, name: str, org: str, now: str) -> None: + """Fetch releases for a repo.""" + releases = gh_api_simple(f"/repos/{org}/{name}/releases?per_page=100") + if not releases or not isinstance(releases, list): + return + + for rel in releases: + conn.execute( + """INSERT INTO gh_release (release_id, repo_id, tag_name, name, draft, + prerelease, created_at, published_at, author_login, html_url, fetched_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(release_id) DO UPDATE SET + tag_name=excluded.tag_name, name=excluded.name, + draft=excluded.draft, prerelease=excluded.prerelease, + published_at=excluded.published_at, fetched_at=excluded.fetched_at""", + ( + rel["id"], + repo_id, + rel["tag_name"], + rel.get("name"), + 1 if rel.get("draft") else 0, + 1 if rel.get("prerelease") else 0, + rel.get("created_at"), + rel.get("published_at"), + (rel.get("author") or {}).get("login"), + rel.get("html_url"), + now, + ), + ) + + # Assets + for asset in rel.get("assets", []): + conn.execute( + """INSERT OR REPLACE INTO gh_release_asset + (asset_id, release_id, name, size_bytes, download_count, content_type) + VALUES (?, ?, ?, ?, ?, ?)""", + ( + asset["id"], + rel["id"], + asset.get("name"), + asset.get("size", 0), + asset.get("download_count", 0), + asset.get("content_type"), + ), + ) + + if releases: + print(f" {len(releases)} releases") + + +def fetch_code_frequency(conn: sqlite3.Connection, repo_id: int, name: str, org: str, now: str) -> None: + """Fetch weekly code frequency stats for a repo.""" + for attempt in range(5): + data = gh_api_simple(f"/repos/{org}/{name}/stats/code_frequency") + if data is None: + # 202 means GitHub is computing; retry + if attempt < 4: + time.sleep(10) + continue + return + if isinstance(data, list): + break + else: + return + + for week in data: + if len(week) >= 3: + conn.execute( + """INSERT OR REPLACE INTO gh_code_frequency + (repo_id, week_ts, additions, deletions) + VALUES (?, ?, ?, ?)""", + (repo_id, week[0], week[1], week[2]), + ) + + +# --------------------------------------------------------------------------- +# Main orchestration +# --------------------------------------------------------------------------- + + +def process_repo( + conn: sqlite3.Connection, + repo: dict, + org: str, + now: str, + since: str | None = None, + skip_commits: bool = False, + full: bool = False, +) -> list[str]: + """Process a single repo. Returns list of skipped entity types.""" + repo_id = repo["id"] if isinstance(repo, dict) else repo[0] + name = repo["name"] if isinstance(repo, dict) else repo[1] + print(f"\n--- {name} ---") + + # Determine incremental since for this repo + repo_since = since + if not full and not since: + row = conn.execute("SELECT value FROM _meta WHERE key = ?", (f"last_fetch_{name}",)).fetchone() + if row: + repo_since = row[0] + print(f" Incremental since {repo_since}") + + skipped = [] + entity_fetchers = [ + ("languages", lambda: fetch_languages(conn, repo_id, name, org, now)), + ("contributors", lambda: fetch_contributors(conn, repo_id, name, org, now)), + ("releases", lambda: fetch_releases(conn, repo_id, name, org, now)), + ("code_frequency", lambda: fetch_code_frequency(conn, repo_id, name, org, now)), + ("prs", lambda: fetch_prs(conn, repo_id, name, org, now, repo_since)), + ("pr_reviews", lambda: fetch_pr_reviews(conn, repo_id, name, org, now)), + ("issues", lambda: fetch_issues(conn, repo_id, name, org, now, repo_since)), + ] + + if not skip_commits: + entity_fetchers.insert( + 0, + ("commits", lambda: fetch_commits(conn, repo_id, name, org, now, repo_since)), + ) + + for entity_name, fetcher in entity_fetchers: + try: + fetcher() + except Exception as e: + print(f" ERROR fetching {entity_name}: {e}", file=sys.stderr) + skipped.append(f"{name}/{entity_name}") + + # Update _meta timestamp for this repo + conn.execute( + "INSERT INTO _meta (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value", + (f"last_fetch_{name}", now), + ) + + # Commit after each repo + conn.commit() + return skipped + + +def main() -> None: + parser = argparse.ArgumentParser(description="Fetch GitHub org data into github.db") + parser.add_argument("--repo", help="Fetch a single repo by name") + parser.add_argument("--since", help="Fetch from date (ISO 8601, e.g. 2026-03-01)") + parser.add_argument("--full", action="store_true", help="Ignore _meta, fetch all history") + parser.add_argument( + "--skip-commits", + action="store_true", + help="Skip commit fetching (faster metadata refresh)", + ) + parser.add_argument("--org", required=True, help="GitHub org to fetch data from") + args = parser.parse_args() + + conn = init_db() + now = datetime.now(timezone.utc).isoformat() + all_skipped: list[str] = [] + + try: + if args.repo: + # Fetch single repo + repo_data = gh_api_simple(f"/repos/{args.org}/{args.repo}") + if not repo_data: + print(f"ERROR: Repo {args.org}/{args.repo} not found", file=sys.stderr) + sys.exit(1) + # Upsert the repo first + repos = fetch_repos(conn, args.org, now) + target = None + for r in repos: + if r["name"] == args.repo: + target = r + break + if not target: + target = repo_data + skipped = process_repo(conn, target, args.org, now, args.since, args.skip_commits, args.full) + all_skipped.extend(skipped) + else: + # Fetch all repos + repos = fetch_repos(conn, args.org, now) + conn.commit() + for repo in repos: + try: + skipped = process_repo(conn, repo, args.org, now, args.since, args.skip_commits, args.full) + all_skipped.extend(skipped) + except KeyboardInterrupt: + print("\nInterrupted — committing pending data...") + conn.commit() + raise + except Exception as e: + print(f" ERROR processing {repo['name']}: {e}", file=sys.stderr) + all_skipped.append(f"{repo['name']}/*") + conn.commit() + + except KeyboardInterrupt: + print("\nInterrupted — saving progress...") + conn.commit() + finally: + # Post-build + print("\n--- Post-build ---") + result = conn.execute("PRAGMA integrity_check").fetchone()[0] + print(f" integrity_check: {result}") + conn.execute("ANALYZE") + print(" ANALYZE: done") + + conn.execute( + "INSERT INTO _meta (key, value) VALUES ('last_build', ?) " + "ON CONFLICT(key) DO UPDATE SET value=excluded.value", + (datetime.now(timezone.utc).isoformat(),), + ) + conn.commit() + conn.close() + + # Summary + print(f"\nGitHub DB written to {DB_PATH}") + if all_skipped: + print(f"\nSkipped due to errors ({len(all_skipped)}):") + for s in all_skipped: + print(f" - {s}") + else: + print("No errors.") + + +if __name__ == "__main__": + main() diff --git a/scripts/fetch_pricing.py b/scripts/fetch_pricing.py new file mode 100644 index 0000000..360ed61 --- /dev/null +++ b/scripts/fetch_pricing.py @@ -0,0 +1,714 @@ +#!/usr/bin/env python3 +# /// script +# requires-python = ">=3.11" +# dependencies = [ +# "httpx>=0.27", +# ] +# /// +"""Fetch cloud pricing data into data/pricing.db. + +Builds a separate SQLite database with pricing for AWS services (EC2, S3, EBS, +ELB, NAT Gateway, Data Transfer) and Claude model token pricing. + +AWS: fetches from public pricing bulk JSON files (no auth needed). +Claude: uses published Anthropic per-token pricing (hardcoded, no API needed). +ROSA: uses `oc get nodes` via subprocess (optional, skipped if oc unavailable). + +A discount factor (e.g., 0.85 for 15% off) can be stored via --discount. +All list prices are stored at face value; discount is applied in views. + +Usage: + uv run scripts/fetch_pricing.py # fetch all + uv run scripts/fetch_pricing.py --aws-only # AWS pricing only + uv run scripts/fetch_pricing.py --claude-only # Claude model pricing only + uv run scripts/fetch_pricing.py --rosa-only # ROSA cluster discovery only + uv run scripts/fetch_pricing.py --skip-rosa # skip ROSA (no oc needed) + uv run scripts/fetch_pricing.py --regions us-east-1 # single region + uv run scripts/fetch_pricing.py --discount 0.85 # set 15% discount factor +""" + +import argparse +import json +import sqlite3 +import subprocess +import sys +import tempfile +from datetime import datetime, timezone +from pathlib import Path + +import httpx + +SCRIPT_DIR = Path(__file__).resolve().parent +REPO_ROOT = SCRIPT_DIR.parent +DATA_DIR = REPO_ROOT / "data" +DB_PATH = DATA_DIR / "pricing.db" + +DEFAULT_REGIONS = ["us-east-1", "us-west-2"] + +# Public AWS pricing bulk file URL template (no auth required) +AWS_PRICING_URL = "https://pricing.us-east-1.amazonaws.com/offers/v1.0/aws/{service}/current/{region}/index.json" + +# AWS services to fetch. EBS is NOT a separate service — it's in the EC2 pricing file. +# NAT Gateway pricing is embedded in EC2 or not available per-region — omitted for now. +AWS_SERVICES = [ + {"service_code": "AmazonEC2", "label": "EC2", "edl_service": "ec2"}, + {"service_code": "AmazonS3", "label": "S3", "edl_service": "s3"}, + {"service_code": "AWSELB", "label": "ELB", "edl_service": "elb"}, + { + "service_code": "AWSDataTransfer", + "label": "Data Transfer", + "edl_service": "data_transfer", + }, +] + +# Published Anthropic Claude model pricing (per million tokens, USD) +# Source: https://www.anthropic.com/pricing — update when pricing changes +CLAUDE_PRICING = [ + {"model": "claude-opus-4", "input_per_mtok": 15.00, "output_per_mtok": 75.00}, + {"model": "claude-sonnet-4", "input_per_mtok": 3.00, "output_per_mtok": 15.00}, + {"model": "claude-haiku-4", "input_per_mtok": 0.80, "output_per_mtok": 4.00}, + {"model": "claude-3-5-sonnet", "input_per_mtok": 3.00, "output_per_mtok": 15.00}, + {"model": "claude-3-5-haiku", "input_per_mtok": 0.80, "output_per_mtok": 4.00}, + {"model": "claude-3-opus", "input_per_mtok": 15.00, "output_per_mtok": 75.00}, + {"model": "claude-3-sonnet", "input_per_mtok": 3.00, "output_per_mtok": 15.00}, + {"model": "claude-3-haiku", "input_per_mtok": 0.25, "output_per_mtok": 1.25}, +] + + +# --------------------------------------------------------------------------- +# Schema +# --------------------------------------------------------------------------- + +SCHEMA_SQL = """\ +CREATE TABLE IF NOT EXISTS cloud_pricing ( + pricing_id INTEGER PRIMARY KEY, + provider TEXT NOT NULL, + service TEXT NOT NULL, + region TEXT NOT NULL, + sku TEXT, + description TEXT, + instance_type TEXT, + instance_family TEXT, + vcpu INTEGER, + memory_gb REAL, + storage_type TEXT, + usage_type TEXT, + unit TEXT NOT NULL, + price_per_unit REAL NOT NULL, + currency TEXT NOT NULL DEFAULT 'USD', + effective_date TEXT, + model_name TEXT, + tier_start REAL, + tier_end REAL, + fetched_at TEXT NOT NULL, + UNIQUE(provider, service, region, sku, usage_type, unit, tier_start) +); + +CREATE TABLE IF NOT EXISTS rosa_cluster_instance ( + cluster_name TEXT NOT NULL, + environment TEXT, + node_name TEXT NOT NULL, + instance_type TEXT NOT NULL, + role TEXT, + availability_zone TEXT, + fetched_at TEXT NOT NULL, + PRIMARY KEY (cluster_name, node_name) +); + +CREATE TABLE IF NOT EXISTS _meta ( + key TEXT PRIMARY KEY, + value TEXT +); + +-- Indexes +CREATE INDEX IF NOT EXISTS idx_pricing_provider_service ON cloud_pricing(provider, service); +CREATE INDEX IF NOT EXISTS idx_pricing_instance_type ON cloud_pricing(instance_type); +CREATE INDEX IF NOT EXISTS idx_pricing_region ON cloud_pricing(region); +CREATE INDEX IF NOT EXISTS idx_pricing_model ON cloud_pricing(model_name); +CREATE INDEX IF NOT EXISTS idx_rosa_instance_type ON rosa_cluster_instance(instance_type); + +-- Views (discount_factor from _meta, defaults to 1.0) +CREATE VIEW IF NOT EXISTS v_ec2_ondemand AS +SELECT instance_type, instance_family, vcpu, memory_gb, region, + price_per_unit AS list_hourly, + ROUND(price_per_unit * COALESCE( + (SELECT CAST(value AS REAL) FROM _meta WHERE key = 'discount_factor'), 1.0 + ), 6) AS hourly_price, + ROUND(price_per_unit * COALESCE( + (SELECT CAST(value AS REAL) FROM _meta WHERE key = 'discount_factor'), 1.0 + ) * 730, 2) AS monthly_price +FROM cloud_pricing +WHERE provider = 'aws' AND service = 'ec2' AND usage_type = 'OnDemand' +ORDER BY instance_family, vcpu; + +CREATE VIEW IF NOT EXISTS v_rosa_estimated_cost AS +SELECT r.cluster_name, r.environment, r.instance_type, r.role, + COUNT(*) AS node_count, + p.price_per_unit AS list_hourly_per_node, + ROUND(COUNT(*) * p.price_per_unit * COALESCE( + (SELECT CAST(value AS REAL) FROM _meta WHERE key = 'discount_factor'), 1.0 + ) * 730, 2) AS estimated_monthly_cost +FROM rosa_cluster_instance r +LEFT JOIN cloud_pricing p ON r.instance_type = p.instance_type + AND p.provider = 'aws' AND p.service = 'ec2' + AND p.usage_type = 'OnDemand' + AND p.region = SUBSTR(r.availability_zone, 1, LENGTH(r.availability_zone) - 1) +GROUP BY r.cluster_name, r.environment, r.instance_type, r.role; + +CREATE VIEW IF NOT EXISTS v_vertex_ai_pricing AS +SELECT model_name, description, usage_type, unit, price_per_unit, + tier_start, tier_end +FROM cloud_pricing +WHERE provider = 'anthropic' AND service = 'vertex_ai' +ORDER BY model_name, usage_type; +""" + + +def init_db() -> sqlite3.Connection: + """Create pricing.db and apply schema.""" + conn = sqlite3.connect(str(DB_PATH)) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA journal_mode = WAL") + conn.executescript(SCHEMA_SQL) + return conn + + +# --------------------------------------------------------------------------- +# AWS Pricing — public bulk JSON files (no auth) +# --------------------------------------------------------------------------- + + +def _parse_ec2_product(product: dict) -> dict: + """Extract EC2-specific attributes from a product.""" + attrs = product.get("attributes", {}) + instance_type = attrs.get("instanceType", "") + family = instance_type.split(".")[0] if "." in instance_type else "" + vcpu = None + memory_gb = None + try: + vcpu = int(attrs.get("vcpu", "0")) + except (ValueError, TypeError): + pass + mem_str = attrs.get("memory", "") + if mem_str and "GiB" in mem_str: + try: + memory_gb = float(mem_str.replace(" GiB", "").replace(",", "")) + except (ValueError, TypeError): + pass + return { + "instance_type": instance_type or None, + "instance_family": family or None, + "vcpu": vcpu, + "memory_gb": memory_gb, + "storage_type": None, + } + + +def _parse_ebs_product(product: dict) -> dict: + attrs = product.get("attributes", {}) + return { + "instance_type": None, + "instance_family": None, + "vcpu": None, + "memory_gb": None, + "storage_type": attrs.get("volumeApiName") or attrs.get("volumeType"), + } + + +def _parse_generic_product(_product: dict) -> dict: + return { + "instance_type": None, + "instance_family": None, + "vcpu": None, + "memory_gb": None, + "storage_type": None, + } + + +def _should_include_ec2(attrs: dict) -> bool: + """Filter EC2 products to Linux/Shared/OnDemand only.""" + return ( + attrs.get("operatingSystem") == "Linux" + and attrs.get("tenancy") == "Shared" + and attrs.get("preInstalledSw") == "NA" + and attrs.get("capacitystatus") == "Used" + ) + + +def _extract_ondemand_prices(terms: dict, sku: str) -> list[dict]: + """Extract OnDemand price dimensions for a given SKU.""" + results = [] + on_demand = terms.get("OnDemand", {}).get(sku, {}) + for _term_key, term_details in on_demand.items() if isinstance(on_demand, dict) else []: + for _dim_key, dim in term_details.get("priceDimensions", {}).items(): + price_str = dim.get("pricePerUnit", {}).get("USD", "0") + try: + price = float(price_str) + except (ValueError, TypeError): + continue + if price == 0.0: + continue + results.append( + { + "usage_type": "OnDemand", + "unit": dim.get("unit", ""), + "price_per_unit": price, + "description": dim.get("description", ""), + "effective_date": term_details.get("effectiveDate"), + } + ) + return results + + +def fetch_aws_pricing(regions: list[str], now: str) -> list[dict]: + """Fetch AWS pricing from public bulk JSON files (no auth required).""" + all_rows: list[dict] = [] + + for svc in AWS_SERVICES: + service_code = svc["service_code"] + edl_service = svc["edl_service"] + label = svc["label"] + + if edl_service == "ec2": + parse_product = _parse_ec2_product + elif edl_service == "ebs": + parse_product = _parse_ebs_product + else: + parse_product = _parse_generic_product + + for region in regions: + url = AWS_PRICING_URL.format(service=service_code, region=region) + print(f" Fetching {label} pricing for {region}...") + print(f" URL: {url}") + + try: + # Download to temp file to handle large files (EC2 ~150MB/region) + with tempfile.NamedTemporaryFile(suffix=".json", delete=True) as tmp: + with httpx.stream("GET", url, timeout=300, follow_redirects=True) as resp: + resp.raise_for_status() + total = 0 + for chunk in resp.iter_bytes(chunk_size=1024 * 1024): + tmp.write(chunk) + total += len(chunk) + print(f" Downloaded {total / 1024 / 1024:.1f} MB") + + tmp.seek(0) + data = json.load(tmp) + + except httpx.HTTPStatusError as e: + print(f" ERROR: HTTP {e.response.status_code}", file=sys.stderr) + continue + except httpx.RequestError as e: + print(f" ERROR: {e}", file=sys.stderr) + continue + except json.JSONDecodeError: + print(" ERROR: invalid JSON response", file=sys.stderr) + continue + + products = data.get("products", {}) + terms = data.get("terms", {}) + product_count = 0 + + for sku, product in products.items(): + attrs = product.get("attributes", {}) + + # For EC2, filter aggressively + if edl_service == "ec2" and not _should_include_ec2(attrs): + continue + + product_fields = parse_product(product) + product_desc = (attrs.get("usagetype", "") + " " + attrs.get("operation", "")).strip() + + for dim in _extract_ondemand_prices(terms, sku): + row = { + "provider": "aws", + "service": edl_service, + "region": region, + "sku": sku, + "description": dim["description"] or product_desc, + "usage_type": dim["usage_type"], + "unit": dim["unit"], + "price_per_unit": dim["price_per_unit"], + "currency": "USD", + "effective_date": dim.get("effective_date"), + "model_name": None, + "tier_start": None, + "tier_end": None, + "fetched_at": now, + **product_fields, + } + all_rows.append(row) + product_count += 1 + + # Free memory before next service/region + del data, products, terms + print(f" {product_count} price points") + + return all_rows + + +# --------------------------------------------------------------------------- +# Claude model pricing — hardcoded from public Anthropic pricing +# --------------------------------------------------------------------------- + + +def fetch_claude_pricing(now: str) -> list[dict]: + """Generate Claude model pricing rows from published Anthropic rates.""" + rows: list[dict] = [] + for model in CLAUDE_PRICING: + name = model["model"] + # Input tokens + rows.append( + { + "provider": "anthropic", + "service": "vertex_ai", + "region": "global", + "sku": f"{name}-input", + "description": f"{name} input tokens (via Vertex AI)", + "instance_type": None, + "instance_family": None, + "vcpu": None, + "memory_gb": None, + "storage_type": None, + "usage_type": "per-token-input", + "unit": "1M tokens", + "price_per_unit": model["input_per_mtok"], + "currency": "USD", + "effective_date": now[:10], + "model_name": name, + "tier_start": None, + "tier_end": None, + "fetched_at": now, + } + ) + # Output tokens + rows.append( + { + "provider": "anthropic", + "service": "vertex_ai", + "region": "global", + "sku": f"{name}-output", + "description": f"{name} output tokens (via Vertex AI)", + "instance_type": None, + "instance_family": None, + "vcpu": None, + "memory_gb": None, + "storage_type": None, + "usage_type": "per-token-output", + "unit": "1M tokens", + "price_per_unit": model["output_per_mtok"], + "currency": "USD", + "effective_date": now[:10], + "model_name": name, + "tier_start": None, + "tier_end": None, + "fetched_at": now, + } + ) + print(f" {len(rows)} Claude model price points") + return rows + + +# --------------------------------------------------------------------------- +# ROSA cluster instance discovery +# --------------------------------------------------------------------------- + + +def _oc_get_contexts() -> list[dict]: + """Get oc/kubectl contexts.""" + try: + result = subprocess.run( + ["oc", "config", "get-contexts", "-o", "name"], + capture_output=True, + text=True, + timeout=10, + ) + if result.returncode != 0: + return [] + names = [n.strip() for n in result.stdout.strip().splitlines() if n.strip()] + contexts = [] + for name in names: + parts = name.split("/") + cluster_url = parts[1] if len(parts) > 1 else name + env = "unknown" + for env_name in ("prod", "stage", "staging", "uat", "dev"): + if env_name in cluster_url.lower(): + env = env_name + break + contexts.append( + { + "name": name, + "cluster_url": cluster_url, + "environment": env, + } + ) + return contexts + except (FileNotFoundError, subprocess.TimeoutExpired): + return [] + + +def fetch_rosa_instances(now: str) -> list[dict]: + """Discover instance types from ROSA clusters via oc.""" + try: + result = subprocess.run( + ["oc", "version", "--client"], + capture_output=True, + text=True, + timeout=10, + ) + if result.returncode != 0: + print(" oc CLI not available, skipping ROSA discovery", file=sys.stderr) + return [] + except (FileNotFoundError, subprocess.TimeoutExpired): + print(" oc CLI not available, skipping ROSA discovery", file=sys.stderr) + return [] + + contexts = _oc_get_contexts() + if not contexts: + print(" No oc contexts found, skipping ROSA discovery") + return [] + + all_rows: list[dict] = [] + for ctx in contexts: + cluster_name = ctx["cluster_url"] + env = ctx["environment"] + print(f" Discovering nodes in {cluster_name} ({env})...") + + try: + result = subprocess.run( + ["oc", "get", "nodes", "--context", ctx["name"], "-o", "json"], + capture_output=True, + text=True, + timeout=30, + ) + if result.returncode != 0: + print(f" ERROR: {result.stderr.strip()[:200]}", file=sys.stderr) + continue + + nodes_data = json.loads(result.stdout) + for node in nodes_data.get("items", []): + labels = node.get("metadata", {}).get("labels", {}) + node_name = node.get("metadata", {}).get("name", "") + instance_type = labels.get("node.kubernetes.io/instance-type", "unknown") + az = labels.get("topology.kubernetes.io/zone", "") + + role = "worker" + for label_key in labels: + if "master" in label_key or "control-plane" in label_key: + role = "master" + break + if "infra" in label_key: + role = "infra" + break + + all_rows.append( + { + "cluster_name": cluster_name, + "environment": env, + "node_name": node_name, + "instance_type": instance_type, + "role": role, + "availability_zone": az, + "fetched_at": now, + } + ) + + print(f" {len(nodes_data.get('items', []))} nodes") + + except json.JSONDecodeError: + print(" ERROR: failed to parse node JSON", file=sys.stderr) + except subprocess.TimeoutExpired: + print(f" ERROR: oc timed out for {cluster_name}", file=sys.stderr) + + return all_rows + + +# --------------------------------------------------------------------------- +# Database upserts +# --------------------------------------------------------------------------- + + +def upsert_pricing(conn: sqlite3.Connection, rows: list[dict]) -> int: + """Upsert cloud pricing rows.""" + count = 0 + for row in rows: + conn.execute( + """INSERT INTO cloud_pricing ( + provider, service, region, sku, description, + instance_type, instance_family, vcpu, memory_gb, storage_type, + usage_type, unit, price_per_unit, currency, effective_date, + model_name, tier_start, tier_end, fetched_at + ) VALUES ( + :provider, :service, :region, :sku, :description, + :instance_type, :instance_family, :vcpu, :memory_gb, :storage_type, + :usage_type, :unit, :price_per_unit, :currency, :effective_date, + :model_name, :tier_start, :tier_end, :fetched_at + ) + ON CONFLICT(provider, service, region, sku, usage_type, unit, tier_start) + DO UPDATE SET + description=excluded.description, + instance_type=excluded.instance_type, + instance_family=excluded.instance_family, + vcpu=excluded.vcpu, + memory_gb=excluded.memory_gb, + storage_type=excluded.storage_type, + price_per_unit=excluded.price_per_unit, + currency=excluded.currency, + effective_date=excluded.effective_date, + model_name=excluded.model_name, + tier_end=excluded.tier_end, + fetched_at=excluded.fetched_at""", + row, + ) + count += 1 + return count + + +def upsert_rosa_instances(conn: sqlite3.Connection, rows: list[dict]) -> int: + """Upsert ROSA cluster instance rows.""" + count = 0 + for row in rows: + conn.execute( + """INSERT INTO rosa_cluster_instance ( + cluster_name, environment, node_name, instance_type, + role, availability_zone, fetched_at + ) VALUES ( + :cluster_name, :environment, :node_name, :instance_type, + :role, :availability_zone, :fetched_at + ) + ON CONFLICT(cluster_name, node_name) DO UPDATE SET + environment=excluded.environment, + instance_type=excluded.instance_type, + role=excluded.role, + availability_zone=excluded.availability_zone, + fetched_at=excluded.fetched_at""", + row, + ) + count += 1 + return count + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + + +def main() -> None: + parser = argparse.ArgumentParser(description="Fetch cloud pricing into pricing.db") + parser.add_argument("--aws-only", action="store_true", help="Fetch AWS pricing only") + parser.add_argument("--claude-only", action="store_true", help="Fetch Claude pricing only") + parser.add_argument("--rosa-only", action="store_true", help="ROSA discovery only") + parser.add_argument("--skip-rosa", action="store_true", help="Skip ROSA discovery") + parser.add_argument( + "--regions", + nargs="+", + default=DEFAULT_REGIONS, + help=f"AWS regions (default: {' '.join(DEFAULT_REGIONS)})", + ) + parser.add_argument( + "--discount", + type=float, + default=None, + help="Discount factor multiplier (e.g., 0.85 = 15%% off). Stored in _meta.", + ) + args = parser.parse_args() + + # Determine what to fetch + fetch_aws = not args.claude_only and not args.rosa_only + fetch_claude = not args.aws_only and not args.rosa_only + fetch_rosa = not args.aws_only and not args.claude_only and not args.skip_rosa + + conn = init_db() + now = datetime.now(timezone.utc).isoformat() + errors: list[str] = [] + + # Store discount factor if provided + if args.discount is not None: + conn.execute( + "INSERT INTO _meta (key, value) VALUES ('discount_factor', ?) " + "ON CONFLICT(key) DO UPDATE SET value=excluded.value", + (str(args.discount),), + ) + conn.commit() + print(f"Discount factor set to {args.discount}") + + try: + # --- AWS --- + if fetch_aws: + print("\n=== AWS Pricing (public, no auth) ===") + rows = fetch_aws_pricing(args.regions, now) + if rows: + count = upsert_pricing(conn, rows) + conn.commit() + print(f" Total: {count} AWS price points upserted") + conn.execute( + "INSERT INTO _meta (key, value) VALUES ('last_aws_fetch', ?) " + "ON CONFLICT(key) DO UPDATE SET value=excluded.value", + (now,), + ) + else: + errors.append("AWS: no data fetched") + + # --- Claude --- + if fetch_claude: + print("\n=== Claude Model Pricing (Anthropic published rates) ===") + rows = fetch_claude_pricing(now) + if rows: + count = upsert_pricing(conn, rows) + conn.commit() + print(f" Total: {count} Claude price points upserted") + conn.execute( + "INSERT INTO _meta (key, value) VALUES ('last_claude_fetch', ?) " + "ON CONFLICT(key) DO UPDATE SET value=excluded.value", + (now,), + ) + + # --- ROSA --- + if fetch_rosa: + print("\n=== ROSA Cluster Discovery ===") + rows = fetch_rosa_instances(now) + if rows: + count = upsert_rosa_instances(conn, rows) + conn.commit() + print(f" Total: {count} cluster nodes upserted") + conn.execute( + "INSERT INTO _meta (key, value) VALUES ('last_rosa_fetch', ?) " + "ON CONFLICT(key) DO UPDATE SET value=excluded.value", + (now,), + ) + else: + errors.append("ROSA: no nodes discovered (oc may not be available)") + + conn.commit() + + except KeyboardInterrupt: + print("\nInterrupted — saving progress...") + conn.commit() + finally: + # Post-build + print("\n--- Post-build ---") + result = conn.execute("PRAGMA integrity_check").fetchone()[0] + print(f" integrity_check: {result}") + conn.execute("ANALYZE") + print(" ANALYZE: done") + + conn.execute( + "INSERT INTO _meta (key, value) VALUES ('last_build', ?) " + "ON CONFLICT(key) DO UPDATE SET value=excluded.value", + (datetime.now(timezone.utc).isoformat(),), + ) + conn.commit() + conn.close() + + # Summary + print(f"\nPricing DB written to {DB_PATH}") + if errors: + print(f"\nWarnings ({len(errors)}):") + for e in errors: + print(f" - {e}") + else: + print("All sources fetched successfully.") + + +if __name__ == "__main__": + main() diff --git a/scripts/refresh_catalog.py b/scripts/refresh_catalog.py new file mode 100644 index 0000000..2c2fbfa --- /dev/null +++ b/scripts/refresh_catalog.py @@ -0,0 +1,778 @@ +#!/usr/bin/env python3 +# /// script +# requires-python = ">=3.11" +# dependencies = [] +# /// +"""Refresh DATA_CATALOG.yaml from the filesystem. + +Scans the data directory, renames files to match the naming convention +(YYYY-MM-DD-kebab-case-name.ext), auto-populates filename/date/source_type, +and preserves manually-entered fields (description, source_url, notes) +for files already in the catalog. New files get stub entries. +Entries for deleted files are removed. + +With --refresh, fetches fresh data from upstream sources (Jira, web URLs) +before renaming and cataloging. Only updates files when content has actually +changed. + +Configuration: + - Jira: set JIRA_URL, JIRA_USERNAME, JIRA_API_TOKEN env vars (or in .env) + - JQL aliases: config/jql-aliases.json (override with JQL_ALIASES_PATH env var) + - Data sources: each catalog entry's source_url/jql_alias drives refresh +""" + +import argparse +import base64 +import csv +import hashlib +import io +import json +import os +import re +import subprocess +import tempfile +import urllib.error +import urllib.parse +import urllib.request +from datetime import datetime +from pathlib import Path + +SCRIPT_DIR = Path(__file__).resolve().parent +REPO_ROOT = SCRIPT_DIR.parent +DATA_DIR = REPO_ROOT / "data" +CONFIG_DIR = REPO_ROOT / "config" +DOTENV_PATH = REPO_ROOT / ".env" +CATALOG_PATH = DATA_DIR / "DATA_CATALOG.yaml" +DATE_PREFIX_RE = re.compile(r"^(\d{4}-\d{2}-\d{2})-(.*)") +SKIP_FILES = { + "DATA_CATALOG.yaml", + "DATA_CATALOG.yaml.bak", + ".DS_Store", + ".gitignore", + ".env", + "schema.sql", + "README.md", +} +# Also skip any .db files (databases are build artifacts, not cataloged sources) +JQL_ALIASES_PATH = Path(os.environ.get("JQL_ALIASES_PATH", str(CONFIG_DIR / "jql-aliases.json"))) +SKIP_SUFFIXES = {".bak", ".tmp", ".db-shm", ".db-wal", ".db"} + +EXTENSION_SOURCE_MAP = { + ".csv": "csv", + ".pdf": "pdf", + ".json": "json", + ".yaml": "yaml", + ".yml": "yaml", + ".txt": "text", + ".xlsx": "spreadsheet", +} + +MIME_TO_EXT = { + "PDF": ".pdf", + "CSV": ".csv", + "JSON": ".json", + "HTML": ".html", + "XML": ".xml", + "ASCII text": ".txt", +} + + +def load_dotenv(path: Path) -> None: + """Load .env file into os.environ. Does not override existing vars.""" + if not path.exists(): + return + for line in path.read_text().splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + if "=" not in line: + continue + key, _, val = line.partition("=") + key = key.strip() + val = val.strip().strip('"').strip("'") + if key and key not in os.environ: + os.environ[key] = val + + +def is_data_file(path: Path) -> bool: + """Return True if path is a data file we should catalog.""" + return ( + path.is_file() + and path.name not in SKIP_FILES + and path.suffix not in SKIP_SUFFIXES + and not path.name.startswith(".") + ) + + +def sniff_extension(filepath: Path) -> str: + """Use `file --brief` to guess the file extension.""" + try: + result = subprocess.run( + ["file", "--brief", str(filepath)], + capture_output=True, + text=True, + check=True, + ) + output = result.stdout.strip() + for keyword, ext in MIME_TO_EXT.items(): + if keyword in output: + return ext + except subprocess.CalledProcessError: + pass + return "" + + +def get_birth_date(filepath: Path) -> str: + """Get the file's birth date as YYYY-MM-DD using stat.""" + try: + result = subprocess.run( + ["stat", "-f", "%SB", "-t", "%Y-%m-%d", str(filepath)], + capture_output=True, + text=True, + check=True, + ) + return result.stdout.strip() + except subprocess.CalledProcessError: + return datetime.now().strftime("%Y-%m-%d") + + +def normalize_filename(filepath: Path) -> str: + """Compute the normalized filename for a data file. + + Convention: YYYY-MM-DD-kebab-case-name.ext + + Rules applied in order: + 1. Extract or add date prefix from filesystem birth time. + 2. Lowercase the name portion. + 3. Spaces -> hyphens, underscores -> hyphens. + 4. Collapse consecutive hyphens. + 5. Sniff extension if missing. + 6. Preserve existing correct extensions. + """ + name = filepath.name + + # Split off date prefix if present + match = DATE_PREFIX_RE.match(name) + if match: + date_prefix = match.group(1) + remainder = match.group(2) + else: + date_prefix = get_birth_date(filepath) + remainder = name + + # Split remainder into stem and extension + # Only treat known extensions as real extensions; dots in names like + # "example.com-foo" should not be treated as extensions. + known_extensions = set(EXTENSION_SOURCE_MAP.keys()) | set(MIME_TO_EXT.values()) + rest_path = Path(remainder) + ext = rest_path.suffix.lower() + if ext in known_extensions: + stem = remainder[: len(remainder) - len(rest_path.suffix)] + else: + ext = "" + stem = remainder + + # Normalize the stem + stem = stem.lower() + stem = stem.replace(" ", "-").replace("_", "-") + stem = re.sub(r"-{2,}", "-", stem) + stem = stem.strip("-") + + # Sniff extension if missing + if not ext: + ext = sniff_extension(filepath) + + return f"{date_prefix}-{stem}{ext}" + + +def guess_source_type(filename: str) -> str: + ext = Path(filename).suffix.lower() + if ext in EXTENSION_SOURCE_MAP: + return EXTENSION_SOURCE_MAP[ext] + try: + result = subprocess.run( + ["file", "--brief", str(DATA_DIR / filename)], + capture_output=True, + text=True, + check=True, + ) + if "PDF" in result.stdout: + return "pdf" + except subprocess.CalledProcessError: + pass + return "unknown" + + +def rename_files(dry_run: bool = False) -> dict[str, str]: + """Rename data files to match naming convention. + + Returns a mapping of {old_name: new_name} for files that were renamed. + """ + renames: dict[str, str] = {} + + data_files = sorted(f for f in DATA_DIR.iterdir() if is_data_file(f)) + + for filepath in data_files: + old_name = filepath.name + new_name = normalize_filename(filepath) + + if old_name == new_name: + continue + + # Handle collision: if target already exists, skip + new_path = DATA_DIR / new_name + if new_path.exists() and new_path != filepath: + print(f" SKIP (target exists): {old_name} -> {new_name}") + continue + + renames[old_name] = new_name + + if dry_run: + print(f" Would rename: {old_name} -> {new_name}") + else: + filepath.rename(new_path) + print(f" Renamed: {old_name} -> {new_name}") + + return renames + + +def parse_catalog(path: Path) -> dict[str, dict]: + """Parse existing YAML catalog into {filename: fields} dict. + + Uses a simple line-based parser to avoid a PyYAML dependency. + """ + entries: dict[str, dict] = {} + if not path.exists(): + return entries + + current: dict | None = None + for raw_line in path.read_text().splitlines(): + line = raw_line.strip() + if line.startswith("- filename:"): + if current and "filename" in current: + entries[current["filename"]] = current + current = {} + val = line.split(":", 1)[1].strip().strip('"') + current["filename"] = val + elif current is not None and ":" in line and not line.startswith("#"): + key, val = line.split(":", 1) + key = key.strip() + val = val.strip().strip('"') + if val == "null": + val = None + current[key] = val + + if current and "filename" in current: + entries[current["filename"]] = current + + return entries + + +def render_catalog(entries: list[dict]) -> str: + """Render catalog entries to YAML string.""" + lines = [ + "# Data Catalog", + "# Describes each file: what it is, when it was captured, and where it came from.", + "", + "files:", + ] + + for entry in entries: + lines.append("") + fn = entry["filename"] + needs_quote = " " in fn or ":" in fn or "_" in fn + fn_str = f'"{fn}"' if needs_quote else fn + lines.append(f" - filename: {fn_str}") + + for key in ( + "date", + "description", + "source_type", + "source_url", + "jql_alias", + "notes", + ): + val = entry.get(key) + if val is None: + lines.append(f" {key}: null") + else: + needs_q = any(c in str(val) for c in (" ", ":", "_", "#", '"')) + if key == "source_url": + needs_q = False # URLs are fine unquoted in YAML + val_str = f'"{val}"' if needs_q else str(val) + lines.append(f" {key}: {val_str}") + + lines.append("") + return "\n".join(lines) + + +def write_catalog(path: Path, content: str) -> None: + """Atomically write catalog: backup existing, write to temp, then replace. + + Guarantees the catalog is never left in a partial/corrupt state. + """ + # Back up existing catalog + if path.exists(): + backup = path.with_suffix(".yaml.bak") + backup.write_text(path.read_text()) + + # Write to temp file in the same directory, then atomic rename + fd, tmp_path = tempfile.mkstemp(dir=path.parent, suffix=".yaml.tmp") + try: + with os.fdopen(fd, "w") as f: + f.write(content) + os.replace(tmp_path, str(path)) + except BaseException: + if os.path.exists(tmp_path): + os.unlink(tmp_path) + raise + + +JIRA_CSV_FIELDS = [ + "key", + "summary", + "status", + "priority", + "assignee", + "reporter", + "issuetype", + "created", + "updated", + "labels", + "components", +] + + +def load_jql_aliases(path: Path) -> dict: + """Load jql-aliases.json and return the parsed dict.""" + if not path.exists(): + raise FileNotFoundError(f"JQL aliases file not found: {path}") + with open(path) as f: + return json.load(f) + + +def fetch_url(url: str, extra_headers: dict | None = None) -> bytes: + """HTTP GET a URL using stdlib. Returns the response body as bytes.""" + headers = {"User-Agent": "refresh_catalog/1.0"} + if extra_headers: + headers.update(extra_headers) + # Truncate URL for display (hide long query strings) + display_url = url.split("?")[0] if len(url) > 80 else url + print(f" GET {display_url}") + req = urllib.request.Request(url, headers=headers) + with urllib.request.urlopen(req, timeout=60) as resp: + data = resp.read() + print(f" -> {resp.status} ({len(data)} bytes)") + return data + + +def _jira_search( + jql: str, + jira_url: str, + jira_username: str, + jira_token: str, + fields: str, + max_results: int = 1000, + expand: str | None = None, + next_page_token: str | None = None, +) -> dict: + """Single Jira Cloud REST search request. Returns parsed JSON response. + + Uses the /rest/api/3/search/jql endpoint with token-based pagination. + """ + param_dict: dict[str, str | int] = { + "jql": jql, + "maxResults": max_results, + "fields": fields, + } + if expand: + param_dict["expand"] = expand + if next_page_token: + param_dict["nextPageToken"] = next_page_token + params = urllib.parse.urlencode(param_dict) + url = f"{jira_url.rstrip('/')}/rest/api/3/search/jql?{params}" + auth_header = "Basic " + base64.b64encode(f"{jira_username}:{jira_token}".encode()).decode() + req = urllib.request.Request( + url, + headers={ + "Authorization": auth_header, + "Accept": "application/json", + "Accept-Encoding": "gzip", + "User-Agent": "refresh_catalog/1.0", + }, + ) + try: + resp = urllib.request.urlopen(req, timeout=120) + except urllib.error.HTTPError as exc: + raise RuntimeError(f"Jira API returned HTTP {exc.code}") from None + except urllib.error.URLError as exc: + raise RuntimeError(f"Jira API request failed: {exc.reason}") from None + with resp: + raw = resp.read() + if resp.headers.get("Content-Encoding") == "gzip": + import gzip + + raw = gzip.decompress(raw) + return json.loads(raw) + + +def fetch_jira_csv(jql: str, jira_url: str, jira_username: str, jira_token: str) -> bytes: + """Paginated Jira Cloud REST search -> CSV bytes. + + Uses token-based pagination via /rest/api/3/search/jql. + Results are deduped by issue key. + """ + jira_fields = "summary,status,priority,assignee,reporter,issuetype,created,updated,labels,components" + max_results = 1000 + + print(f" GET {jira_url.rstrip('/')}/rest/api/3/search/jql (page: 1)") + data = _jira_search( + jql, + jira_url, + jira_username, + jira_token, + jira_fields, + max_results=max_results, + ) + issues = data.get("issues", []) + seen_keys: set[str] = set() + all_issues: list[dict] = [] + + for issue in issues: + key = issue.get("key", "") + if key not in seen_keys: + seen_keys.add(key) + all_issues.append(issue) + + page = 1 + print(f" -> {len(issues)} issues ({len(all_issues)} total)") + + while not data.get("isLast", True) and data.get("nextPageToken"): + page += 1 + print(f" GET {jira_url.rstrip('/')}/rest/api/3/search/jql (page: {page})") + data = _jira_search( + jql, + jira_url, + jira_username, + jira_token, + jira_fields, + max_results=max_results, + next_page_token=data["nextPageToken"], + ) + issues = data.get("issues", []) + + for issue in issues: + key = issue.get("key", "") + if key not in seen_keys: + seen_keys.add(key) + all_issues.append(issue) + + print(f" -> {len(issues)} issues ({len(all_issues)} total)") + + print(f" Total: {len(all_issues)} unique issues ({page} pages)") + + # Flatten to CSV + buf = io.StringIO() + writer = csv.DictWriter(buf, fieldnames=JIRA_CSV_FIELDS) + writer.writeheader() + for issue in all_issues: + fields = issue.get("fields", {}) + row = { + "key": issue.get("key", ""), + "summary": fields.get("summary", ""), + "status": (fields.get("status") or {}).get("name", ""), + "priority": (fields.get("priority") or {}).get("name", ""), + "assignee": (fields.get("assignee") or {}).get("displayName", ""), + "reporter": (fields.get("reporter") or {}).get("displayName", ""), + "issuetype": (fields.get("issuetype") or {}).get("name", ""), + "created": fields.get("created", ""), + "updated": fields.get("updated", ""), + "labels": ",".join(fields.get("labels", [])), + "components": ",".join(c.get("name", "") for c in fields.get("components", [])), + } + writer.writerow(row) + + return buf.getvalue().encode("utf-8") + + +def file_hash(path: Path) -> str: + """Return SHA-256 hex digest of a file's contents.""" + h = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + h.update(chunk) + return h.hexdigest() + + +def content_hash(data: bytes) -> str: + """Return SHA-256 hex digest of in-memory bytes.""" + return hashlib.sha256(data).hexdigest() + + +def atomic_write_bytes(path: Path, data: bytes) -> None: + """Write data to path atomically via temp file + os.replace.""" + fd, tmp_path = tempfile.mkstemp(dir=path.parent, suffix=".tmp") + try: + with os.fdopen(fd, "wb") as f: + f.write(data) + os.replace(tmp_path, str(path)) + except BaseException: + if os.path.exists(tmp_path): + os.unlink(tmp_path) + raise + + +def refresh_sources(entries: list[dict], aliases_data: dict, dry_run: bool, only: str | None = None) -> list[dict]: + """Fetch fresh data for catalog entries that have upstream sources. + + Returns the (possibly modified) entries list with updated filenames/dates. + If `only` is set, only entries whose filename or source_type contains + the substring (case-insensitive) are refreshed. + """ + today = datetime.now().strftime("%Y-%m-%d") + static_aliases = aliases_data.get("aliases", {}) + dynamic_alias_keys = set(aliases_data.get("dynamic_aliases", {}).keys()) + + jira_url = os.environ.get("JIRA_URL", "") + jira_username = os.environ.get("JIRA_USERNAME", "") + jira_token = os.environ.get("JIRA_API_TOKEN", "") + print( + f" Jira credentials: URL={'(set)' if jira_url else '(not set)'}, " + f"USERNAME={'(set)' if jira_username else '(not set)'}, " + f"API_TOKEN={'(set)' if jira_token else '(not set)'}" + ) + + refreshed = 0 + unchanged = 0 + errors = 0 + + for entry in entries: + source_type = entry.get("source_type", "") + source_url = entry.get("source_url") + jql_alias = entry.get("jql_alias") + filename = entry["filename"] + + # Apply --only filter + if only: + pattern = only.lower() + if pattern not in filename.lower() and pattern not in source_type.lower(): + continue + + # Determine if this entry is refreshable + is_jira = source_type == "jira" and jql_alias + is_url = source_url and source_type in ("web-pdf", "csv") + if not is_jira and not is_url: + continue + + if dry_run: + if is_jira: + print(f" Would refresh (jira/{jql_alias}): {filename}") + else: + print(f" Would refresh ({source_type}): {filename}") + continue + + # Fetch content + try: + if is_jira: + if not jira_url or not jira_username or not jira_token: + print(f" ERROR: JIRA_URL, JIRA_USERNAME, and JIRA_API_TOKEN env vars required for: {filename}") + errors += 1 + continue + if jql_alias in dynamic_alias_keys: + print(f" SKIP (dynamic alias '{jql_alias}'): {filename}") + continue + if jql_alias not in static_aliases: + print(f" ERROR: jql_alias '{jql_alias}' not found in jql-aliases.json: {filename}") + errors += 1 + continue + jql = static_aliases[jql_alias]["jql"] + new_data = fetch_jira_csv(jql, jira_url, jira_username, jira_token) + else: + new_data = fetch_url(source_url) + except Exception as exc: + # Sanitize error message to avoid leaking credentials in tracebacks + err_msg = str(exc) + for secret in (jira_token, jira_username): + if secret: + err_msg = err_msg.replace(secret, "***") + print(f" ERROR fetching {filename}: {err_msg}") + errors += 1 + continue + + # Validate -- refuse to write empty content + if not new_data: + print(f" ERROR: empty response for {filename}, skipping") + errors += 1 + continue + + # Diff against existing file + existing_path = DATA_DIR / filename + if existing_path.exists() and content_hash(new_data) == file_hash(existing_path): + print(f" Unchanged: {filename}") + unchanged += 1 + continue + + # Compute new filename with today's date + match = DATE_PREFIX_RE.match(filename) + if match: + name_part = match.group(2) + else: + name_part = filename + new_filename = f"{today}-{name_part}" + new_path = DATA_DIR / new_filename + + # Back up old file if we'd overwrite the same name + if new_path.exists() and new_path == existing_path: + bak_path = new_path.with_suffix(new_path.suffix + ".bak") + bak_path.write_bytes(new_path.read_bytes()) + + # Write new file atomically + atomic_write_bytes(new_path, new_data) + + # Remove old dated file if different from new path + if existing_path.exists() and existing_path != new_path: + existing_path.unlink() + + # Update catalog entry + entry["filename"] = new_filename + entry["date"] = today + print(f" Refreshed: {filename} -> {new_filename} ({len(new_data)} bytes)") + refreshed += 1 + + print(f"Refresh: {refreshed} updated, {unchanged} unchanged, {errors} errors") + return entries + + +def main() -> None: + parser = argparse.ArgumentParser(description="Refresh DATA_CATALOG.yaml from the filesystem.") + parser.add_argument( + "--dry-run", + action="store_true", + help="Print what would change without modifying anything.", + ) + parser.add_argument( + "--refresh", + action="store_true", + help="Fetch fresh data from upstream sources before cataloging.", + ) + parser.add_argument( + "--only", + metavar="PATTERN", + help="Only refresh entries whose filename or source_type contains PATTERN (substring match).", + ) + args = parser.parse_args() + + load_dotenv(DOTENV_PATH) + + # Step 0 (optional): Refresh data from upstream sources + existing_catalog = parse_catalog(CATALOG_PATH) + if args.refresh: + try: + aliases_data = load_jql_aliases(JQL_ALIASES_PATH) + except FileNotFoundError as exc: + print(f"WARNING: {exc} -- skipping Jira sources") + aliases_data = {"aliases": {}, "dynamic_aliases": {}} + + # Build entry list from catalog for refresh + refresh_entries = list(existing_catalog.values()) + refresh_sources(refresh_entries, aliases_data, dry_run=args.dry_run, only=args.only) + + # Re-key catalog after refresh may have changed filenames + existing_catalog = {e["filename"]: e for e in refresh_entries} + + # Step 1: Rename files to match naming convention + renames = rename_files(dry_run=args.dry_run) + + if renames: + print(f"Renames: {len(renames)} file(s)") + else: + print("Renames: none needed") + + # Step 2: Re-key existing catalog entries to match new filenames + # so that manually-entered metadata is preserved across renames. + for old_name, new_name in renames.items(): + if old_name in existing_catalog: + entry = existing_catalog.pop(old_name) + entry["filename"] = new_name + existing_catalog[new_name] = entry + + # Step 3: Build the catalog from current filesystem state + # In dry-run mode, simulate the renames so the catalog reflects final state. + reverse_renames = {v: k for k, v in renames.items()} + disk_files = sorted(f.name for f in DATA_DIR.iterdir() if is_data_file(f)) + if args.dry_run and renames: + data_files = sorted(renames.get(f, f) for f in disk_files) + else: + data_files = disk_files + + entries = [] + added = [] + removed = [] + + for fname in data_files: + match = DATE_PREFIX_RE.match(fname) + date = match.group(1) if match else None + + if fname in existing_catalog: + entry = existing_catalog[fname] + entry["filename"] = fname + if date: + entry["date"] = date + else: + entry = { + "filename": fname, + "date": date, + "description": None, + "source_type": guess_source_type(reverse_renames.get(fname, fname)), + "source_url": None, + "jql_alias": None, + "notes": None, + } + added.append(fname) + + entries.append(entry) + + REFRESHABLE_TYPES = { + "jira", + "web-pdf", + "csv", + } + for old_name in existing_catalog: + if old_name not in {e["filename"] for e in entries}: + old_entry = existing_catalog[old_name] + # Keep entries with a source_url that can be refreshed -- the file + # may not exist yet because it hasn't been fetched successfully. + if old_entry.get("source_url") and old_entry.get("source_type") in REFRESHABLE_TYPES: + entries.append(old_entry) + else: + removed.append(old_name) + + has_changes = bool(renames or added or removed) + + # Render new catalog content and compare to existing + new_content = render_catalog(entries) + old_content = CATALOG_PATH.read_text() if CATALOG_PATH.exists() else "" + content_changed = new_content != old_content + + if not args.dry_run and content_changed: + write_catalog(CATALOG_PATH, new_content) + + # Summary + print(f"Catalog: {len(entries)} files") + if added: + for fname in added: + print(f" + {fname}") + if removed: + for fname in removed: + print(f" - {fname}") + if not has_changes and not content_changed: + print(" No changes") + elif not has_changes and content_changed: + print(" Catalog reformatted (no file changes)") + + if args.dry_run: + print("\n(dry run -- no changes made)") + + +if __name__ == "__main__": + main() diff --git a/scripts/test.sh b/scripts/test.sh index a12e134..28f318e 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -105,6 +105,46 @@ if [ "${1:-}" = "--accept-schema" ]; then echo " Schema baseline updated." fi +# Pricing DB (optional) +PRICING_DB="$REPO_ROOT/data/pricing.db" +if [ -f "$PRICING_DB" ]; then + echo "" + echo "--- Pricing DB ---" + run "pricing: integrity_check" uv run python3 -c " +import sqlite3, sys +conn = sqlite3.connect('$PRICING_DB') +result = conn.execute('PRAGMA integrity_check').fetchone()[0] +sys.exit(0 if result == 'ok' else 1) +" + for table in cloud_pricing rosa_cluster_instance _meta; do + run "pricing: $table exists" uv run python3 -c " +import sqlite3, sys +conn = sqlite3.connect('$PRICING_DB') +conn.execute('SELECT 1 FROM $table LIMIT 1') +" + done +fi + +# GitHub DB (optional) +GITHUB_DB="$REPO_ROOT/data/github.db" +if [ -f "$GITHUB_DB" ]; then + echo "" + echo "--- GitHub DB ---" + run "github: integrity_check" uv run python3 -c " +import sqlite3, sys +conn = sqlite3.connect('$GITHUB_DB') +result = conn.execute('PRAGMA integrity_check').fetchone()[0] +sys.exit(0 if result == 'ok' else 1) +" + for table in gh_repo gh_commit gh_pull_request gh_issue gh_user _meta; do + run "github: $table exists" uv run python3 -c " +import sqlite3, sys +conn = sqlite3.connect('$GITHUB_DB') +conn.execute('SELECT 1 FROM $table LIMIT 1') +" + done +fi + # Summary echo "" echo "=== Results: $PASS passed, $FAIL failed ==="