diff --git a/src/psrt_ghsa_bot/app.py b/src/psrt_ghsa_bot/app.py index fa58fbe..7c7f248 100644 --- a/src/psrt_ghsa_bot/app.py +++ b/src/psrt_ghsa_bot/app.py @@ -3,15 +3,17 @@ import base64 import csv import datetime +import json import os import re import typing +import urllib.parse import urllib3 from cvelib.cve_api import CveApi from dotenv import load_dotenv from githubkit import AppAuthStrategy, GitHub -from githubkit.exception import RequestFailed +from githubkit.exception import RequestFailed, RequestError load_dotenv() @@ -56,19 +58,48 @@ def get_repository_advisories( repo: str, ) -> typing.Iterable[dict[str, typing.Any]]: """Lists repository security advisories using the REST API.""" - from githubkit.exception import RequestFailed - import json try: - # Use direct request instead of paginate to avoid validation issues + # We can't use the GitHubKit pagination helper because + # of Pydantic validation issues. We manually paginate instead. response = github.rest.security_advisories.list_repository_advisories( owner=owner, repo=repo, + per_page=100, ) - # Parse JSON directly to bypass Pydantic validation - advisories = json.loads(response.content) - for advisory in advisories: - yield advisory + next_url = None + while True: + # Parse JSON directly to bypass Pydantic validation + advisories = json.loads(response.content) + for advisory in advisories: + yield advisory + + # Find the 'Next' URL for pagination. If the + # URL exists we need to use this URL as the + # 'list_security_advisories()' URL is completely + # different to the 'Next' URL and the GitHub API blows up + # if we pass '?after=...' as a parameter to + # 'list_security_advisories()' :shrug: + link_header = response.headers.get("Link", "") + if mat := re.search(r'<([^>]+)>; rel="next"', link_header): + next_url = mat.group(1) + if not next_url: + break + + # To avoid double-percent-quoting the 'after' parameter in + # our request we unquote the value first before + # sending it back into an HTTP client. + after = urllib.parse.unquote(re.search(r"after=([^&]+)", next_url).group(1)) + # Remove params, we add them back in the request. + next_url = next_url.split("?", 1)[0] + + response = github_client_request( + client=github.rest.security_advisories, + method="GET", + url=next_url, + params={"per_page": 100, "after": after}, + ) + next_url = None # Reset for next loop. except RequestFailed as e: # 404 means no advisories or no access - that's okay if e.response.status_code == 404: @@ -76,6 +107,21 @@ def get_repository_advisories( raise +def github_client_request(client: typing.Any, method: str, url: str, params: dict[str, str | int]) -> typing.Any: + """Sends a raw HTTP request using a GitHub API client""" + headers = {"X-GitHub-Api-Version": client._REST_API_VERSION} + return client._github.request( + method, + url, + params=params, + headers=headers, + error_models={ + "400": RequestError, + "404": RequestError, + }, + ) + + def reserve_one_cve(cve_api: CveApi) -> str: """Reserves a single CVE ID""" resp = cve_api.reserve(count=1, random=True, year=str(datetime.date.today().year))