diff --git a/main.py b/main.py index 449e4ad..2bc4f49 100755 --- a/main.py +++ b/main.py @@ -17,6 +17,7 @@ from urllib.parse import parse_qs, urlparse import requests +from google.api_core import exceptions as api_exceptions from google.api_core.client_options import ClientOptions from google.auth.credentials import AnonymousCredentials from google.cloud import bigquery @@ -518,10 +519,100 @@ def transform_data(raw_data: list[dict], repo: str) -> dict: return transformed_data +def snapshot_exists( + client: bigquery.Client, + dataset_id: str, + repo: str, + snapshot_date: str, +) -> bool: + """ + Check if data already exists in BigQuery for the given repo and snapshot date. + + Queries the pull_requests table as a sentinel — if rows exist there for this + (repo, snapshot_date) pair, we treat the repo as already processed for today. + + Args: + client: BigQuery client instance + dataset_id: BigQuery dataset ID + repo: Repository in "owner/repo" format + snapshot_date: Snapshot date string in YYYY-MM-DD format + + Returns: + True if data already exists, False otherwise + """ + query = f""" + SELECT 1 + FROM `{client.project}.{dataset_id}.pull_requests` + WHERE snapshot_date = @snapshot_date + AND target_repository = @repo + LIMIT 1 + """ + job_config = bigquery.QueryJobConfig( + query_parameters=[ + bigquery.ScalarQueryParameter("snapshot_date", "DATE", snapshot_date), + bigquery.ScalarQueryParameter("repo", "STRING", repo), + ] + ) + try: + results = list(client.query(query, job_config=job_config).result()) + return len(results) > 0 + except api_exceptions.NotFound as e: + # A missing table is expected on first run (no snapshot yet). + # A missing dataset means BIGQUERY_DATASET is misconfigured — re-raise so it + # fails loudly rather than proceeding silently until insert_rows_json fails. + if f"datasets/{dataset_id}" in str(e): + logger.error( + f"BigQuery dataset '{dataset_id}' not found — check BIGQUERY_DATASET config: {e}" + ) + raise + logger.info( + f"Table pull_requests not found in {dataset_id}, treating as no existing snapshot" + ) + return False + + +def delete_existing_snapshot( + client: bigquery.Client, + dataset_id: str, + repo: str, + snapshot_date: str, +) -> None: + """ + Delete all rows for (repo, snapshot_date) across all tables before a fresh load. + + This makes loads idempotent: if a previous run crashed mid-way and left partial + data, a rerun will clean up the partial write and reload everything cleanly. + + Args: + client: BigQuery client instance + dataset_id: BigQuery dataset ID + repo: Repository in "owner/repo" format + snapshot_date: Snapshot date string in YYYY-MM-DD format + """ + tables = ["pull_requests", "commits", "reviewers", "comments"] + for table in tables: + dml = f""" + DELETE FROM `{client.project}.{dataset_id}.{table}` + WHERE snapshot_date = @snapshot_date + AND target_repository = @repo + """ + job_config = bigquery.QueryJobConfig( + query_parameters=[ + bigquery.ScalarQueryParameter("snapshot_date", "DATE", snapshot_date), + bigquery.ScalarQueryParameter("repo", "STRING", repo), + ] + ) + client.query(dml, job_config=job_config).result() + logger.info( + f"Deleted existing snapshot rows from {table} for {repo} on {snapshot_date}" + ) + + def load_data( client: bigquery.Client, dataset_id: str, transformed_data: dict, + snapshot_date: str, ) -> None: """ Load transformed data to BigQuery using the Python client library. @@ -531,15 +622,14 @@ def load_data( dataset_id: BigQuery dataset ID transformed_data: Dictionary containing tables ('pull_requests', 'commits', 'reviewers', 'comments') mapped to lists of row dictionaries + snapshot_date: Snapshot date string in YYYY-MM-DD format, computed once by the + caller to avoid date-boundary skew between the existence check and inserts """ if not transformed_data: logger.warning("No data to load, skipping") return - # Add snapshot date to all rows - snapshot_date = datetime.now(timezone.utc).strftime("%Y-%m-%d") - for table, load_table_data in transformed_data.items(): if not load_table_data: logger.warning(f"No data to load for table {table}, skipping") @@ -644,8 +734,20 @@ def _main() -> int: ) total_processed = 0 + snapshot_date = datetime.now(timezone.utc).strftime("%Y-%m-%d") for repo in github_repos: + # Delete any existing rows for this (repo, snapshot_date) before loading. + # This makes every run idempotent: if a previous run crashed mid-way and left + # partial data, a rerun will clean up the partial write and reload cleanly. + if snapshot_exists(bigquery_client, bigquery_dataset, repo, snapshot_date): + logger.info( + f"Deleting partial/existing snapshot for {repo} on {snapshot_date} before reload" + ) + delete_existing_snapshot( + bigquery_client, bigquery_dataset, repo, snapshot_date + ) + # Get (or refresh) the installation access token before processing each repo if github_jwt: access_token = get_installation_access_token( @@ -665,7 +767,9 @@ def _main() -> int: transformed_data = transform_data(chunk, repo) # Load - load_data(bigquery_client, bigquery_dataset, transformed_data) + load_data( + bigquery_client, bigquery_dataset, transformed_data, snapshot_date + ) total_processed += len(chunk) logger.info(