From f442c9cb5ad576c9900d281fe30e846b69b20d00 Mon Sep 17 00:00:00 2001 From: David Lawrence Date: Thu, 19 Mar 2026 10:01:21 -0400 Subject: [PATCH 1/2] Bug 2024636 - Add a duplicate check to make sure that we do not insert duplicate rows into big query for the same snapshot_date --- main.py | 50 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/main.py b/main.py index 449e4ad..85f9a6b 100755 --- a/main.py +++ b/main.py @@ -17,6 +17,7 @@ from urllib.parse import parse_qs, urlparse import requests +from google.api_core import exceptions as api_exceptions from google.api_core.client_options import ClientOptions from google.auth.credentials import AnonymousCredentials from google.cloud import bigquery @@ -518,6 +519,47 @@ def transform_data(raw_data: list[dict], repo: str) -> dict: return transformed_data +def snapshot_exists( + client: bigquery.Client, + dataset_id: str, + repo: str, + snapshot_date: str, +) -> bool: + """ + Check if data already exists in BigQuery for the given repo and snapshot date. + + Queries the pull_requests table as a sentinel — if rows exist there for this + (repo, snapshot_date) pair, we treat the repo as already processed for today. + + Args: + client: BigQuery client instance + dataset_id: BigQuery dataset ID + repo: Repository in "owner/repo" format + snapshot_date: Snapshot date string in YYYY-MM-DD format + + Returns: + True if data already exists, False otherwise + """ + query = f""" + SELECT 1 + FROM `{client.project}.{dataset_id}.pull_requests` + WHERE snapshot_date = @snapshot_date + AND target_repository = @repo + LIMIT 1 + """ + job_config = bigquery.QueryJobConfig( + query_parameters=[ + bigquery.ScalarQueryParameter("snapshot_date", "DATE", snapshot_date), + bigquery.ScalarQueryParameter("repo", "STRING", repo), + ] + ) + try: + results = list(client.query(query, job_config=job_config).result()) + return len(results) > 0 + except api_exceptions.NotFound: + return False + + def load_data( client: bigquery.Client, dataset_id: str, @@ -644,8 +686,16 @@ def _main() -> int: ) total_processed = 0 + snapshot_date = datetime.now(timezone.utc).strftime("%Y-%m-%d") for repo in github_repos: + # Skip repos that have already been processed for today's snapshot date + if snapshot_exists(bigquery_client, bigquery_dataset, repo, snapshot_date): + logger.info( + f"Skipping {repo}: data already exists for snapshot_date={snapshot_date}" + ) + continue + # Get (or refresh) the installation access token before processing each repo if github_jwt: access_token = get_installation_access_token( From 39ba9b685c22e8159083b2d860581d3f8ec72577 Mon Sep 17 00:00:00 2001 From: David Lawrence Date: Thu, 19 Mar 2026 16:42:04 -0400 Subject: [PATCH 2/2] Copilot suggested fixes --- main.py | 70 ++++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 62 insertions(+), 8 deletions(-) diff --git a/main.py b/main.py index 85f9a6b..2bc4f49 100755 --- a/main.py +++ b/main.py @@ -556,14 +556,63 @@ def snapshot_exists( try: results = list(client.query(query, job_config=job_config).result()) return len(results) > 0 - except api_exceptions.NotFound: + except api_exceptions.NotFound as e: + # A missing table is expected on first run (no snapshot yet). + # A missing dataset means BIGQUERY_DATASET is misconfigured — re-raise so it + # fails loudly rather than proceeding silently until insert_rows_json fails. + if f"datasets/{dataset_id}" in str(e): + logger.error( + f"BigQuery dataset '{dataset_id}' not found — check BIGQUERY_DATASET config: {e}" + ) + raise + logger.info( + f"Table pull_requests not found in {dataset_id}, treating as no existing snapshot" + ) return False +def delete_existing_snapshot( + client: bigquery.Client, + dataset_id: str, + repo: str, + snapshot_date: str, +) -> None: + """ + Delete all rows for (repo, snapshot_date) across all tables before a fresh load. + + This makes loads idempotent: if a previous run crashed mid-way and left partial + data, a rerun will clean up the partial write and reload everything cleanly. + + Args: + client: BigQuery client instance + dataset_id: BigQuery dataset ID + repo: Repository in "owner/repo" format + snapshot_date: Snapshot date string in YYYY-MM-DD format + """ + tables = ["pull_requests", "commits", "reviewers", "comments"] + for table in tables: + dml = f""" + DELETE FROM `{client.project}.{dataset_id}.{table}` + WHERE snapshot_date = @snapshot_date + AND target_repository = @repo + """ + job_config = bigquery.QueryJobConfig( + query_parameters=[ + bigquery.ScalarQueryParameter("snapshot_date", "DATE", snapshot_date), + bigquery.ScalarQueryParameter("repo", "STRING", repo), + ] + ) + client.query(dml, job_config=job_config).result() + logger.info( + f"Deleted existing snapshot rows from {table} for {repo} on {snapshot_date}" + ) + + def load_data( client: bigquery.Client, dataset_id: str, transformed_data: dict, + snapshot_date: str, ) -> None: """ Load transformed data to BigQuery using the Python client library. @@ -573,15 +622,14 @@ def load_data( dataset_id: BigQuery dataset ID transformed_data: Dictionary containing tables ('pull_requests', 'commits', 'reviewers', 'comments') mapped to lists of row dictionaries + snapshot_date: Snapshot date string in YYYY-MM-DD format, computed once by the + caller to avoid date-boundary skew between the existence check and inserts """ if not transformed_data: logger.warning("No data to load, skipping") return - # Add snapshot date to all rows - snapshot_date = datetime.now(timezone.utc).strftime("%Y-%m-%d") - for table, load_table_data in transformed_data.items(): if not load_table_data: logger.warning(f"No data to load for table {table}, skipping") @@ -689,12 +737,16 @@ def _main() -> int: snapshot_date = datetime.now(timezone.utc).strftime("%Y-%m-%d") for repo in github_repos: - # Skip repos that have already been processed for today's snapshot date + # Delete any existing rows for this (repo, snapshot_date) before loading. + # This makes every run idempotent: if a previous run crashed mid-way and left + # partial data, a rerun will clean up the partial write and reload cleanly. if snapshot_exists(bigquery_client, bigquery_dataset, repo, snapshot_date): logger.info( - f"Skipping {repo}: data already exists for snapshot_date={snapshot_date}" + f"Deleting partial/existing snapshot for {repo} on {snapshot_date} before reload" + ) + delete_existing_snapshot( + bigquery_client, bigquery_dataset, repo, snapshot_date ) - continue # Get (or refresh) the installation access token before processing each repo if github_jwt: @@ -715,7 +767,9 @@ def _main() -> int: transformed_data = transform_data(chunk, repo) # Load - load_data(bigquery_client, bigquery_dataset, transformed_data) + load_data( + bigquery_client, bigquery_dataset, transformed_data, snapshot_date + ) total_processed += len(chunk) logger.info(