Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 108 additions & 4 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from urllib.parse import parse_qs, urlparse

import requests
from google.api_core import exceptions as api_exceptions
from google.api_core.client_options import ClientOptions
from google.auth.credentials import AnonymousCredentials
from google.cloud import bigquery
Expand Down Expand Up @@ -518,10 +519,100 @@ def transform_data(raw_data: list[dict], repo: str) -> dict:
return transformed_data


def snapshot_exists(
client: bigquery.Client,
dataset_id: str,
repo: str,
snapshot_date: str,
) -> bool:
"""
Check if data already exists in BigQuery for the given repo and snapshot date.

Queries the pull_requests table as a sentinel — if rows exist there for this
(repo, snapshot_date) pair, we treat the repo as already processed for today.

Args:
client: BigQuery client instance
dataset_id: BigQuery dataset ID
repo: Repository in "owner/repo" format
snapshot_date: Snapshot date string in YYYY-MM-DD format

Returns:
True if data already exists, False otherwise
"""
query = f"""
SELECT 1
FROM `{client.project}.{dataset_id}.pull_requests`
WHERE snapshot_date = @snapshot_date
AND target_repository = @repo
LIMIT 1
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("snapshot_date", "DATE", snapshot_date),
bigquery.ScalarQueryParameter("repo", "STRING", repo),
]
)
try:
results = list(client.query(query, job_config=job_config).result())
return len(results) > 0
except api_exceptions.NotFound as e:
# A missing table is expected on first run (no snapshot yet).
# A missing dataset means BIGQUERY_DATASET is misconfigured — re-raise so it
# fails loudly rather than proceeding silently until insert_rows_json fails.
if f"datasets/{dataset_id}" in str(e):
logger.error(
f"BigQuery dataset '{dataset_id}' not found — check BIGQUERY_DATASET config: {e}"
)
raise
logger.info(
f"Table pull_requests not found in {dataset_id}, treating as no existing snapshot"
)
return False


def delete_existing_snapshot(
client: bigquery.Client,
dataset_id: str,
repo: str,
snapshot_date: str,
) -> None:
"""
Delete all rows for (repo, snapshot_date) across all tables before a fresh load.

This makes loads idempotent: if a previous run crashed mid-way and left partial
data, a rerun will clean up the partial write and reload everything cleanly.

Args:
client: BigQuery client instance
dataset_id: BigQuery dataset ID
repo: Repository in "owner/repo" format
snapshot_date: Snapshot date string in YYYY-MM-DD format
"""
tables = ["pull_requests", "commits", "reviewers", "comments"]
for table in tables:
dml = f"""
DELETE FROM `{client.project}.{dataset_id}.{table}`
WHERE snapshot_date = @snapshot_date
AND target_repository = @repo
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ScalarQueryParameter("snapshot_date", "DATE", snapshot_date),
bigquery.ScalarQueryParameter("repo", "STRING", repo),
]
)
client.query(dml, job_config=job_config).result()
logger.info(
f"Deleted existing snapshot rows from {table} for {repo} on {snapshot_date}"
)


def load_data(
client: bigquery.Client,
dataset_id: str,
transformed_data: dict,
snapshot_date: str,
) -> None:
"""
Load transformed data to BigQuery using the Python client library.
Expand All @@ -531,15 +622,14 @@ def load_data(
dataset_id: BigQuery dataset ID
transformed_data: Dictionary containing tables ('pull_requests',
'commits', 'reviewers', 'comments') mapped to lists of row dictionaries
snapshot_date: Snapshot date string in YYYY-MM-DD format, computed once by the
caller to avoid date-boundary skew between the existence check and inserts
"""

if not transformed_data:
logger.warning("No data to load, skipping")
return

# Add snapshot date to all rows
snapshot_date = datetime.now(timezone.utc).strftime("%Y-%m-%d")

for table, load_table_data in transformed_data.items():
if not load_table_data:
logger.warning(f"No data to load for table {table}, skipping")
Expand Down Expand Up @@ -644,8 +734,20 @@ def _main() -> int:
)

total_processed = 0
snapshot_date = datetime.now(timezone.utc).strftime("%Y-%m-%d")

for repo in github_repos:
# Delete any existing rows for this (repo, snapshot_date) before loading.
# This makes every run idempotent: if a previous run crashed mid-way and left
# partial data, a rerun will clean up the partial write and reload cleanly.
if snapshot_exists(bigquery_client, bigquery_dataset, repo, snapshot_date):
logger.info(
f"Deleting partial/existing snapshot for {repo} on {snapshot_date} before reload"
)
delete_existing_snapshot(
bigquery_client, bigquery_dataset, repo, snapshot_date
)

# Get (or refresh) the installation access token before processing each repo
if github_jwt:
access_token = get_installation_access_token(
Expand All @@ -665,7 +767,9 @@ def _main() -> int:
transformed_data = transform_data(chunk, repo)

# Load
load_data(bigquery_client, bigquery_dataset, transformed_data)
load_data(
bigquery_client, bigquery_dataset, transformed_data, snapshot_date
)

total_processed += len(chunk)
logger.info(
Expand Down
Loading