Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 54 additions & 8 deletions src/psrt_ghsa_bot/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@
import base64
import csv
import datetime
import json
import os
import re
import typing
import urllib.parse

import urllib3
from cvelib.cve_api import CveApi
from dotenv import load_dotenv
from githubkit import AppAuthStrategy, GitHub
from githubkit.exception import RequestFailed
from githubkit.exception import RequestFailed, RequestError

load_dotenv()

Expand Down Expand Up @@ -56,26 +58,70 @@ def get_repository_advisories(
repo: str,
) -> typing.Iterable[dict[str, typing.Any]]:
"""Lists repository security advisories using the REST API."""
from githubkit.exception import RequestFailed
import json

try:
# Use direct request instead of paginate to avoid validation issues
# We can't use the GitHubKit pagination helper because
# of Pydantic validation issues. We manually paginate instead.
response = github.rest.security_advisories.list_repository_advisories(
owner=owner,
repo=repo,
per_page=100,
)
# Parse JSON directly to bypass Pydantic validation
advisories = json.loads(response.content)
for advisory in advisories:
yield advisory
next_url = None
while True:
# Parse JSON directly to bypass Pydantic validation
advisories = json.loads(response.content)
for advisory in advisories:
yield advisory

# Find the 'Next' URL for pagination. If the
# URL exists we need to use this URL as the
# 'list_security_advisories()' URL is completely
# different to the 'Next' URL and the GitHub API blows up
# if we pass '?after=...' as a parameter to
# 'list_security_advisories()' :shrug:
link_header = response.headers.get("Link", "")
if mat := re.search(r'<([^>]+)>; rel="next"', link_header):
next_url = mat.group(1)
if not next_url:
break

# To avoid double-percent-quoting the 'after' parameter in
# our request we unquote the value first before
# sending it back into an HTTP client.
after = urllib.parse.unquote(re.search(r"after=([^&]+)", next_url).group(1))
# Remove params, we add them back in the request.
next_url = next_url.split("?", 1)[0]

response = github_client_request(
client=github.rest.security_advisories,
method="GET",
url=next_url,
params={"per_page": 100, "after": after},
)
next_url = None # Reset for next loop.
except RequestFailed as e:
# 404 means no advisories or no access - that's okay
if e.response.status_code == 404:
return
raise


def github_client_request(client: typing.Any, method: str, url: str, params: dict[str, str | int]) -> typing.Any:

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is basically copied from githubkit's "Client" class.

"""Sends a raw HTTP request using a GitHub API client"""
headers = {"X-GitHub-Api-Version": client._REST_API_VERSION}
return client._github.request(
method,
url,
params=params,
headers=headers,
error_models={
"400": RequestError,
"404": RequestError,
},
)


def reserve_one_cve(cve_api: CveApi) -> str:
"""Reserves a single CVE ID"""
resp = cve_api.reserve(count=1, random=True, year=str(datetime.date.today().year))
Expand Down
Loading