Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 80 additions & 1 deletion allowlist-check/check_asf_allowlist.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import fnmatch
import glob
import os
import shlex
import sys
from typing import Any, Generator

Expand Down Expand Up @@ -148,6 +149,54 @@ def is_allowed(action_ref: str, allowlist: list[str]) -> bool:
return any(fnmatch.fnmatch(action_ref, pattern) for pattern in allowlist)


def build_gh_pr_command(action_name: str, refs: list[str], repo_name: str) -> str:
"""Build a shell command that creates a PR adding one action to the allowlist.

The generated script forks ``apache/infrastructure-actions``, inserts
pinned version entries into ``actions.yml`` in alphabetical order, and
opens a pull request — all via the ``gh`` CLI with no manual file editing
required.

Args:
action_name: The action name (e.g. ``"owner/action"``).
refs: Full action refs for this action (e.g. ``["owner/action@sha"]``).
repo_name: Value of ``$GITHUB_REPOSITORY`` (may be empty).

Returns:
str: A copy-pasteable shell script.
"""
branch = f"allowlist-add-{action_name.replace('/', '-')}"
title = f"Add {action_name} to the GitHub Actions allowlist"

body_lines = [f"Add `{action_name}` to the allowlist:", ""]
for ref in sorted(refs):
body_lines.append(f"- `{ref}`")
if repo_name:
body_lines.extend(["", f"Needed by: `{repo_name}`"])
body = "\n".join(body_lines)

ref_args = " ".join(shlex.quote(r) for r in sorted(refs))

inserter_url = (
"https://raw.githubusercontent.com/apache/infrastructure-actions/"
"main/allowlist-check/insert_actions.py"
)

return (
f"( set -e; _d=$(mktemp -d); trap 'rm -rf \"$_d\"' EXIT; cd \"$_d\"\n"
f" gh repo fork apache/infrastructure-actions --clone -- --depth=1\n"
f" cd infrastructure-actions\n"
f" git checkout -b {shlex.quote(branch)}\n"
f" curl -fsSL {shlex.quote(inserter_url)} | python3 - actions.yml {ref_args}\n"
f" git add actions.yml\n"
f" git commit -m {shlex.quote(f'Add {action_name} to allowlist')}\n"
f" git push -u origin {shlex.quote(branch)}\n"
f" gh pr create --repo apache/infrastructure-actions --head \"$(gh api user -q .login):{shlex.quote(branch)}\""
f" --title {shlex.quote(title)}"
f" --body {shlex.quote(body)} )\n"
)


def main():
if len(sys.argv) != 2:
print(f"Usage: {sys.argv[0]} <allowlist_path>", file=sys.stderr)
Expand All @@ -158,9 +207,21 @@ def main():
scan_glob = os.environ.get("GITHUB_YAML_GLOB", DEFAULT_GITHUB_YAML_GLOB)
action_refs = collect_action_refs(scan_glob)

print(f"Checking {len(action_refs)} unique action ref(s) against the ASF allowlist:\n")
violations = []
for action_ref, filepaths in sorted(action_refs.items()):
if not is_allowed(action_ref, allowlist):
allowed = is_allowed(action_ref, allowlist)
owner = action_ref.split("/")[0]
if owner in TRUSTED_OWNERS:
reason = f"trusted owner ({owner})"
elif allowed:
reason = "matches allowlist"
else:
reason = "NOT ON ALLOWLIST"
status = "✅" if allowed else "❌"
files_str = ", ".join(filepaths)
print(f" {status} {action_ref} — {reason} ({files_str})")
if not allowed:
for filepath in filepaths:
violations.append((filepath, action_ref))

Expand All @@ -175,6 +236,24 @@ def main():
" the action or version to the allowlist:"
" https://github.com/apache/infrastructure-actions#adding-a-new-action-to-the-allow-list"
)

missing_refs = sorted({ref for _, ref in violations})
repo_name = os.environ.get("GITHUB_REPOSITORY", "")

# Group by action name so we can suggest one PR per action
by_action: dict[str, list[str]] = {}
for ref in missing_refs:
name = ref.split("@")[0]
by_action.setdefault(name, []).append(ref)

print(
"\n::notice::Please create one PR per action."
" You can create the PRs by running the commands below:"
)
for action_name in sorted(by_action):
script = build_gh_pr_command(action_name, by_action[action_name], repo_name)
print(f"\n# {action_name}\n{script}")

sys.exit(1)
else:
print(f"All {len(action_refs)} unique action refs are on the ASF allowlist")
Expand Down
74 changes: 74 additions & 0 deletions allowlist-check/insert_actions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Insert action entries into actions.yml in alphabetical order.

Usage:
python3 insert_actions.py <actions.yml> <ref> [<ref> ...]

Each ``ref`` is an action reference in ``owner/action@version`` format.
New entries are inserted so that the top-level keys in ``actions.yml``
remain sorted case-insensitively. If an action already exists in the
file, the entry is skipped (it will *not* be overwritten).
"""

import re
import sys


def insert_actions(actions_yml_path: str, refs: list[str]) -> None:
"""Insert *refs* into *actions_yml_path* in alphabetical order."""
# Group refs by action name: {name: [version, ...]}
by_action: dict[str, list[str]] = {}
for ref in refs:
name, _, version = ref.partition("@")
by_action.setdefault(name, []).append(version or "*")

# Build YAML blocks for the new entries
new_entries: dict[str, str] = {}
for name in sorted(by_action):
lines = [f"{name}:"]
for version in sorted(by_action[name]):
lines.append(f" '{version}':")
lines.append(" keep: true")
new_entries[name] = "\n".join(lines)

# Parse existing top-level blocks
text = open(actions_yml_path).read()
blocks = re.split(r"(?m)(?=^\S)", text)
by_key: dict[str, str] = {}
for block in blocks:
if block.strip():
by_key[block.split(":", 1)[0].strip()] = block.rstrip()

# Merge — setdefault keeps existing entries untouched
for key, value in new_entries.items():
by_key.setdefault(key, value)

# Write back sorted
with open(actions_yml_path, "w") as f:
f.write(
"\n".join(by_key[k] for k in sorted(by_key, key=str.casefold))
+ "\n"
)


if __name__ == "__main__":
if len(sys.argv) < 3:
print(f"Usage: {sys.argv[0]} <actions.yml> <ref> [<ref> ...]", file=sys.stderr)
sys.exit(2)
insert_actions(sys.argv[1], sys.argv[2:])
166 changes: 166 additions & 0 deletions allowlist-check/test_check_asf_allowlist.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@
import tempfile
import textwrap
import unittest
from unittest.mock import patch

from check_asf_allowlist import (
build_gh_pr_command,
collect_action_refs,
find_action_refs,
is_allowed,
load_allowlist,
main,
)
from insert_actions import insert_actions


class TestFindActionRefs(unittest.TestCase):
Expand Down Expand Up @@ -303,5 +307,167 @@ def test_no_matching_files(self):
self.assertEqual(refs, {})


class TestInsertActions(unittest.TestCase):
"""Tests for the insert_actions helper script."""

def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.actions_yml = os.path.join(self.tmpdir, "actions.yml")

def tearDown(self):
shutil.rmtree(self.tmpdir)

def test_inserts_alphabetically(self):
with open(self.actions_yml, "w") as f:
f.write("aaa/action:\n 'sha1':\n keep: true\nzzz/action:\n 'sha2':\n keep: true\n")
insert_actions(self.actions_yml, ["mmm/middle@sha3"])
content = open(self.actions_yml).read()
lines = content.splitlines()
top_keys = [l for l in lines if not l.startswith(" ") and l.endswith(":")]
self.assertEqual(top_keys, ["aaa/action:", "mmm/middle:", "zzz/action:"])

def test_does_not_overwrite_existing(self):
with open(self.actions_yml, "w") as f:
f.write("org/action:\n 'existing-sha':\n keep: true\n")
insert_actions(self.actions_yml, ["org/action@new-sha"])
content = open(self.actions_yml).read()
self.assertIn("existing-sha", content)
self.assertNotIn("new-sha", content)

def test_multiple_refs_same_action(self):
with open(self.actions_yml, "w") as f:
f.write("")
insert_actions(self.actions_yml, ["org/act@sha1", "org/act@sha2"])
content = open(self.actions_yml).read()
self.assertIn("sha1", content)
self.assertIn("sha2", content)
self.assertEqual(content.count("org/act:"), 1)

def test_case_insensitive_sort(self):
with open(self.actions_yml, "w") as f:
f.write("Bbb/action:\n 'sha1':\n keep: true\n")
insert_actions(self.actions_yml, ["aaa/action@sha2"])
content = open(self.actions_yml).read()
self.assertTrue(content.index("aaa/action:") < content.index("Bbb/action:"))


class TestBuildGhPrCommand(unittest.TestCase):
"""Tests for the generated gh PR command."""

def test_single_action(self):
script = build_gh_pr_command(
"evil-org/evil-action", ["evil-org/evil-action@abc123"], "apache/test-repo"
)
self.assertIn("gh repo fork apache/infrastructure-actions --clone", script)
self.assertIn("allowlist-add-evil-org-evil-action", script)
self.assertIn("insert_actions.py", script)
self.assertIn("evil-org/evil-action@abc123", script)
self.assertIn("gh pr create --repo apache/infrastructure-actions", script)
self.assertIn("apache/test-repo", script)

def test_no_repo_name(self):
script = build_gh_pr_command(
"some-org/some-action", ["some-org/some-action@sha1"], ""
)
self.assertNotIn("Needed by:", script)

def test_multiple_shas_same_action(self):
script = build_gh_pr_command(
"org/action", ["org/action@sha1", "org/action@sha2"], ""
)
self.assertIn("org/action@sha1", script)
self.assertIn("org/action@sha2", script)
self.assertIn("allowlist-add-org-action", script)

def test_downloads_inserter_from_raw_github(self):
"""The generated script must download insert_actions.py."""
script = build_gh_pr_command(
"zoo/action", ["zoo/action@abc123"], ""
)
self.assertIn(
"https://raw.githubusercontent.com/apache/infrastructure-actions/"
"main/allowlist-check/insert_actions.py",
script,
)
self.assertIn("curl -fsSL", script)
self.assertIn("python3 -", script)


class TestMainGhPrCommand(unittest.TestCase):
"""Tests that main() prints a gh PR command on violations."""

def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.github_dir = os.path.join(self.tmpdir, ".github", "workflows")
os.makedirs(self.github_dir)

filepath = os.path.join(self.github_dir, "ci.yml")
with open(filepath, "w") as f:
f.write(
textwrap.dedent(
"""\
name: CI
on: push
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: evil-org/evil-action@abc123
"""
)
)

self.allowlist_path = os.path.join(self.tmpdir, "allowlist.yml")
with open(self.allowlist_path, "w") as f:
f.write("")

def tearDown(self):
shutil.rmtree(self.tmpdir)

@patch.dict(os.environ, {"GITHUB_REPOSITORY": "apache/test-repo"})
def test_main_prints_pr_command(self):
scan_glob = os.path.join(self.tmpdir, ".github/**/*.yml")
with (
patch.dict(os.environ, {"GITHUB_YAML_GLOB": scan_glob}),
patch("sys.argv", ["check_asf_allowlist.py", self.allowlist_path]),
patch("sys.stdout") as mock_stdout,
self.assertRaises(SystemExit) as cm,
):
main()

self.assertEqual(cm.exception.code, 1)
output = "".join(
call.args[0] for call in mock_stdout.write.call_args_list
)
self.assertIn("gh pr create --repo apache/infrastructure-actions", output)
self.assertIn("evil-org/evil-action", output)
self.assertIn("apache/test-repo", output)
self.assertIn("Please create one PR per action", output)

@patch.dict(os.environ, {"GITHUB_REPOSITORY": "apache/test-repo"})
def test_main_prints_verbose_check_output(self):
scan_glob = os.path.join(self.tmpdir, ".github/**/*.yml")
with (
patch.dict(os.environ, {"GITHUB_YAML_GLOB": scan_glob}),
patch("sys.argv", ["check_asf_allowlist.py", self.allowlist_path]),
patch("sys.stdout") as mock_stdout,
self.assertRaises(SystemExit),
):
main()

output = "".join(
call.args[0] for call in mock_stdout.write.call_args_list
)
# Trusted action should show as allowed with reason
self.assertIn("actions/checkout@v4", output)
self.assertIn("trusted owner", output)
# Violation should show as not allowed
self.assertIn("evil-org/evil-action@abc123", output)
self.assertIn("NOT ON ALLOWLIST", output)
# Header line
self.assertIn("Checking 2 unique action ref(s)", output)


if __name__ == "__main__":
unittest.main()
Loading