From 388c7a11d27f400f40462fbea343aff0e13a3f92 Mon Sep 17 00:00:00 2001 From: "Tobias.Mikula" Date: Wed, 22 Apr 2026 11:28:54 +0200 Subject: [PATCH 01/13] Removing sec stats and remove-adept-to-close-on-issue-close.ym --- .../remove-adept-to-close-on-issue-close.yml | 73 ------ pyproject.toml | 6 +- src/security/derive_team_security_metrics.py | 208 ------------------ src/security/extract_team_security_stats.py | 206 ----------------- 4 files changed, 1 insertion(+), 492 deletions(-) delete mode 100644 .github/workflows/remove-adept-to-close-on-issue-close.yml delete mode 100644 src/security/derive_team_security_metrics.py delete mode 100644 src/security/extract_team_security_stats.py diff --git a/.github/workflows/remove-adept-to-close-on-issue-close.yml b/.github/workflows/remove-adept-to-close-on-issue-close.yml deleted file mode 100644 index 88acb4d..0000000 --- a/.github/workflows/remove-adept-to-close-on-issue-close.yml +++ /dev/null @@ -1,73 +0,0 @@ -# -# Copyright 2026 ABSA Group Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Reusable workflow – Remove sec:adept-to-close label when an issue is closed. -# -# Called from application repositories via workflow_call. -# The caller must trigger on `issues: [closed]`. -# Note: for `workflow_call`, the called workflow receives the same event payload as the caller, -# so `context.payload` (aka `github.event`) is populated without needing to "forward" it via inputs. - -name: Remove sec:adept-to-close on close - -on: - workflow_call: - -permissions: - issues: write - -jobs: - cleanup-label: - runs-on: ubuntu-latest - steps: - - name: Remove label when conditions match - uses: actions/github-script@ed597411d8f924073f98dfc5c65a23a2325f34cd - with: - script: | - const issue = context.payload.issue; - - // Safety: ignore PRs (they can appear as issues in GitHub UI) - if (issue.pull_request) { - core.info('Skipping: payload refers to a pull request, not an issue.'); - return; - } - - const labels = (issue.labels ?? []) - .map(l => (typeof l === 'string' ? l : l?.name)) - .filter(Boolean); - - const hasScopeSecurity = labels.includes('scope:security'); - const hasTechDebt = labels.includes('type:tech-debt'); - const hasAdeptToClose = labels.includes('sec:adept-to-close'); - - if (!hasScopeSecurity || !hasTechDebt) { - core.info( - `Skipping: required labels missing (scope:security=${hasScopeSecurity}, type:tech-debt=${hasTechDebt}).` - ); - return; - } - - if (!hasAdeptToClose) { - core.info('No-op: label sec:adept-to-close is not present on the issue.'); - return; - } - - await github.rest.issues.removeLabel({ - owner: context.repo.owner, - repo: context.repo.repo, - issue_number: issue.number, - name: 'sec:adept-to-close', - }); diff --git a/pyproject.toml b/pyproject.toml index 957e1a1..be4f01c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,11 +8,7 @@ target-version = ['py314'] force-exclude = '''test''' [tool.coverage.run] -omit = [ - "tests/*", - "src/security/derive_team_security_metrics.py", - "src/security/extract_team_security_stats.py", -] +omit = ["tests/*"] [tool.mypy] check_untyped_defs = true diff --git a/src/security/derive_team_security_metrics.py b/src/security/derive_team_security_metrics.py deleted file mode 100644 index 8316e66..0000000 --- a/src/security/derive_team_security_metrics.py +++ /dev/null @@ -1,208 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright 2026 ABSA Group Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# pylint: skip-file -""" -Derive team security metrics from issue snapshots (Issues-only). - -Resurfacing definition (B): -- A fingerprint is considered 'resurfaced' when its occurrence_count transitions - from 0 in the previous snapshot to >0 in the current snapshot. - -Inputs: -- data/issues_snapshot.json (required) -- data/issues_snapshot.prev.json (optional; if missing, resurfacing cannot be computed) - -Outputs: -- reports/metrics.json -- reports/summary.md (appends derived metrics) -""" - -import json -import logging -import os -from datetime import datetime -from typing import Any, Dict, List, Optional - -SNAPSHOT_CUR = os.environ.get("SNAPSHOT_CURRENT", "data/issues_snapshot.json") -SNAPSHOT_PREV = os.environ.get("SNAPSHOT_PREVIOUS", "data/issues_snapshot.prev.json") - -OUT_METRICS_JSON = os.environ.get("OUT_METRICS_JSON", "reports/metrics.json") -OUT_SUMMARY_MD = os.environ.get("OUT_SUMMARY_MD", "reports/summary.md") - - -def require_env(key: str) -> str: - """Return the value of environment variable *key*, or exit.""" - try: - return os.environ[key] - except KeyError as exc: - raise SystemExit(f"Missing required environment variable: {key}") from exc - - -TEAM_SLUG = require_env("GITHUB_TEAM_SLUG") - - -def _safe_int(v: Any, default: int = 0) -> int: - """Coerce *v* to ``int``, returning *default* on failure.""" - try: - if v is None: - return default - if isinstance(v, int): - return v - s = str(v).strip() - if s == "": - return default - return int(float(s)) - except Exception: - return default - - -def _load_json(path: str) -> Optional[Any]: - """Load and return JSON from *path*, or ``None`` if the file is missing.""" - if not os.path.exists(path): - return None - with open(path, "r", encoding="utf-8") as f: - return json.load(f) - - -def _index_by_fingerprint(snapshot: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: - """Index snapshot items by their secmeta fingerprint.""" - idx: Dict[str, Dict[str, Any]] = {} - for item in snapshot: - fp = (item.get("secmeta") or {}).get("fingerprint") - if not fp: - # If secmeta is missing or malformed, it cannot participate in fingerprint-level stats. - continue - idx[fp] = item - return idx - - -def _severity_from_labels(labels: List[str]) -> str: - """Extract the severity token from ``sec:sev/`` labels.""" - for l in labels: - if l.startswith("sec:sev/"): - return l.split("/", 1)[1] - return "unknown" - - -def main() -> None: - """Derive and write team security metrics from issue snapshots.""" - # TODO decide about changes related to this script - logging.warning( - "This script is deprecated and may be removed in the future. Please refer to the updated documentation for deriving security metrics." - ) - return - - cur = _load_json(SNAPSHOT_CUR) - if cur is None: - raise SystemExit(f"Missing current snapshot: {SNAPSHOT_CUR}") - - if not isinstance(cur, list): - raise SystemExit(f"Current snapshot is not a list: {SNAPSHOT_CUR}") - - prev = _load_json(SNAPSHOT_PREV) - - cur_idx = _index_by_fingerprint(cur) - prev_idx = _index_by_fingerprint(prev) if isinstance(prev, list) else {} - - # Basic counts - total = len(cur) - by_sev: Dict[str, int] = {} - postponed = 0 - needs_review = 0 - - for item in cur: - labels = item.get("labels") or [] - sev = _severity_from_labels(labels) - by_sev[sev] = by_sev.get(sev, 0) + 1 - if "sec:state/postponed" in labels: - postponed += 1 - if "sec:state/needs-review" in labels: - needs_review += 1 - - # Resurfacing (B): prev occurrence_count == 0 and current > 0 - resurfaced: List[Dict[str, Any]] = [] - if prev_idx: - for fp, cur_item in cur_idx.items(): - cur_occ = _safe_int((cur_item.get("secmeta") or {}).get("occurrence_count"), 0) - prev_item = prev_idx.get(fp) - prev_occ = _safe_int(((prev_item or {}).get("secmeta") or {}).get("occurrence_count"), 0) - if prev_item is not None and prev_occ == 0 and cur_occ > 0: - resurfaced.append( - { - "fingerprint": fp, - "repo": cur_item.get("repo"), - "issue_number": cur_item.get("issue_number"), - "title": cur_item.get("title"), - "severity": _severity_from_labels(cur_item.get("labels") or []), - "prev_occurrence_count": prev_occ, - "current_occurrence_count": cur_occ, - } - ) - - metrics = { - "team": TEAM_SLUG, - "generated_at_utc": datetime.utcnow().isoformat() + "Z", - "snapshot_current": SNAPSHOT_CUR, - "snapshot_previous": SNAPSHOT_PREV if prev_idx else None, - "counts": { - "total_security_issues": total, - "postponed": postponed, - "needs_review": needs_review, - "by_severity": dict(sorted(by_sev.items())), - }, - "resurfaced": { - "definition": "B: fingerprint occurrence_count from 0 (previous snapshot) to >0 (current snapshot)", - "count": len(resurfaced), - "items": resurfaced, - }, - } - - os.makedirs(os.path.dirname(OUT_METRICS_JSON), exist_ok=True) - os.makedirs(os.path.dirname(OUT_SUMMARY_MD), exist_ok=True) - - with open(OUT_METRICS_JSON, "w", encoding="utf-8") as f: - json.dump(metrics, f, indent=2) - - # Append to (or create) summary.md - summary_lines: List[str] = [] - summary_lines.append(f"\n## Derived metrics\n") - summary_lines.append(f"Generated at: {metrics['generated_at_utc']}\n") - if metrics["snapshot_previous"] is None: - summary_lines.append("- Resurfacing: not computed (no previous snapshot found)\n") - else: - summary_lines.append(f"- Resurfaced fingerprints (definition B): {metrics['resurfaced']['count']}\n") - if resurfaced: - summary_lines.append("\n### Resurfaced items\n") - for r in resurfaced[:50]: - summary_lines.append( - f"- {r['severity']} {r['repo']}#{r['issue_number']} (occ {r['prev_occurrence_count']} -> {r['current_occurrence_count']}): {r['title']}\n" - ) - if len(resurfaced) > 50: - summary_lines.append(f"- ... and {len(resurfaced) - 50} more\n") - - # Ensure summary exists; if not, create a minimal header. - if not os.path.exists(OUT_SUMMARY_MD): - with open(OUT_SUMMARY_MD, "w", encoding="utf-8") as f: - f.write(f"# Security summary for team `{TEAM_SLUG}`\n\n") - - with open(OUT_SUMMARY_MD, "a", encoding="utf-8") as f: - f.writelines(summary_lines) - - -if __name__ == "__main__": - main() diff --git a/src/security/extract_team_security_stats.py b/src/security/extract_team_security_stats.py deleted file mode 100644 index da1b6ac..0000000 --- a/src/security/extract_team_security_stats.py +++ /dev/null @@ -1,206 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright 2026 ABSA Group Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# pylint: skip-file -""" -Extract security statistics per GitHub team. - -Model: -- Source of truth: GitHub Issues -- Scope: repositories owned by a given GitHub team -- Unit: one Issue = one logical vulnerability (fingerprint) - -Outputs: -- data/issues_snapshot.json -- data/events_flat.csv -- reports/summary.md -""" - -import csv -import json -import logging -import os -import re -from datetime import datetime -from github import Github - - -# -------------------- -# Configuration -# -------------------- -def require_env(key: str) -> str: - """Return the value of environment variable *key*, or exit.""" - try: - return os.environ[key] - except KeyError as exc: - raise SystemExit(f"Missing required environment variable: {key}") from exc - - -GITHUB_TOKEN = require_env("GITHUB_TOKEN") -ORG = require_env("GITHUB_ORG") -TEAM_SLUG = require_env("GITHUB_TEAM_SLUG") - -OUT_DATA = "data" -OUT_REPORTS = "reports" - -SEC_LABEL_PREFIX = "sec:" - -SEC_EVENT_RE = re.compile(r"\[sec-event\](.*?)\[/sec-event\]", re.S) -SECMETA_RE = re.compile(r"```secmeta(.*?)```", re.S) - -# -------------------- -# Helpers -# -------------------- - - -def ensure_dirs(): - """Create output directories if they don't exist.""" - os.makedirs(OUT_DATA, exist_ok=True) - os.makedirs(OUT_REPORTS, exist_ok=True) - - -def parse_kv_block(block: str) -> dict: - """Parse a ``key=value``-per-line block into a dict.""" - data = {} - for line in block.splitlines(): - line = line.strip() - if not line or "=" not in line: - continue - k, v = line.split("=", 1) - data[k.strip()] = v.strip() - return data - - -def parse_secmeta(body: str) -> dict: - """Extract the secmeta key-value block from an issue body.""" - match = SECMETA_RE.search(body or "") - if not match: - return {} - return parse_kv_block(match.group(1)) - - -def parse_events(comments): - """Extract ``[sec-event]`` blocks from issue comments.""" - events = [] - for c in comments: - for raw in SEC_EVENT_RE.findall(c.body or ""): - evt = parse_kv_block(raw) - evt["timestamp"] = c.created_at.isoformat() - events.append(evt) - return events - - -def issue_has_sec_label(issue): - """Return ``True`` if *issue* carries any ``sec:`` prefixed label.""" - return any(l.name.startswith(SEC_LABEL_PREFIX) for l in issue.labels) - - -# -------------------- -# Main extraction -# -------------------- - - -def main(): - """Extract security statistics from GitHub Issues for the configured team.""" - # TODO decide about changes related to this script - logging.warning( - "This script is deprecated and may be removed in the future. Please refer to the updated documentation for deriving security metrics." - ) - return - - ensure_dirs() - - gh = Github(GITHUB_TOKEN) - org = gh.get_organization(ORG) - team = org.get_team_by_slug(TEAM_SLUG) - - repos = list(team.get_repos()) - - snapshot = [] - flat_events = [] - - for repo in repos: - issues = repo.get_issues(state="all") - for issue in issues: - # Skip PRs that may be returned by the issues API - if getattr(issue, "pull_request", None): - continue - - if not issue_has_sec_label(issue): - continue - - secmeta = parse_secmeta(issue.body or "") - events = parse_events(issue.get_comments()) - - snapshot.append( - { - "repo": repo.full_name, - "issue_number": issue.number, - "title": issue.title, - "state": issue.state, - "labels": [l.name for l in issue.labels], - "secmeta": secmeta, - "created_at": issue.created_at.isoformat(), - "updated_at": issue.updated_at.isoformat(), - "event_count": len(events), - } - ) - - for e in events: - fp = secmeta.get("fingerprint") if secmeta else None - if not fp: - continue # ignore events without a fingerprint - flat_events.append( - { - "repo": repo.full_name, - "issue_number": issue.number, - "fingerprint": fp, - "action": e.get("action"), - "reason": e.get("reason"), - "timestamp": e.get("timestamp"), - } - ) - - # Write snapshot - with open(os.path.join(OUT_DATA, "issues_snapshot.json"), "w") as f: - json.dump(snapshot, f, indent=2) - - # Write flat events - with open(os.path.join(OUT_DATA, "events_flat.csv"), "w", newline="") as f: - writer = csv.DictWriter(f, fieldnames=["repo", "issue_number", "fingerprint", "action", "reason", "timestamp"]) - writer.writeheader() - writer.writerows(flat_events) - - # Summary report - total = len(snapshot) - by_sev = {} - - for item in snapshot: - sev = next((l for l in item["labels"] if l.startswith("sec:sev/")), "sec:sev/unknown") - by_sev[sev] = by_sev.get(sev, 0) + 1 - - with open(os.path.join(OUT_REPORTS, "summary.md"), "w") as f: - f.write(f"# Security summary for team `{TEAM_SLUG}`\n\n") - f.write(f"Generated at: {datetime.utcnow().isoformat()} UTC\n\n") - f.write(f"## Total security issues: {total}\n\n") - f.write("## By severity\n\n") - for sev, cnt in sorted(by_sev.items()): - f.write(f"- {sev}: {cnt}\n") - - -if __name__ == "__main__": - main() From 5f6c7d67204cd499d30a478185f570c2f8bc5851 Mon Sep 17 00:00:00 2001 From: "Tobias.Mikula" Date: Wed, 22 Apr 2026 11:41:30 +0200 Subject: [PATCH 02/13] Removing sec-events (gh comments) --- src/core/github/client.py | 8 +- src/core/github/projects.py | 17 +-- src/security/constants.py | 3 - src/security/issues/sec_events.py | 72 ---------- src/security/issues/sync.py | 92 +------------ tests/security/issues/test_sec_events.py | 163 ----------------------- tests/security/issues/test_sync.py | 60 +-------- tests/security/test_constants.py | 2 - 8 files changed, 8 insertions(+), 409 deletions(-) delete mode 100644 src/security/issues/sec_events.py delete mode 100644 tests/security/issues/test_sec_events.py diff --git a/src/core/github/client.py b/src/core/github/client.py index 12b39ad..3213419 100644 --- a/src/core/github/client.py +++ b/src/core/github/client.py @@ -18,29 +18,25 @@ import logging import subprocess -from collections.abc import Mapping - def run_cmd( cmd: list[str], *, capture_output: bool = True, - env: Mapping[str, str] | None = None, ) -> subprocess.CompletedProcess: """Run *cmd* as a subprocess and return the completed process.""" - return subprocess.run(cmd, check=False, capture_output=capture_output, text=True, env=env) + return subprocess.run(cmd, check=False, capture_output=capture_output, text=True) def run_gh( args: list[str], *, capture_output: bool = True, - env: Mapping[str, str] | None = None, ) -> subprocess.CompletedProcess: """Run a ``gh`` CLI command and return the completed process.""" cmd = ["gh"] + args try: - return run_cmd(cmd, capture_output=capture_output, env=env) + return run_cmd(cmd, capture_output=capture_output) except FileNotFoundError as exc: logging.error("gh CLI not found. Install and authenticate gh.") raise SystemExit(1) from exc diff --git a/src/core/github/projects.py b/src/core/github/projects.py index 7e16f04..4a623f3 100644 --- a/src/core/github/projects.py +++ b/src/core/github/projects.py @@ -21,8 +21,7 @@ import json import logging -import os -from collections.abc import Mapping + from dataclasses import dataclass from typing import Any @@ -52,21 +51,11 @@ class ProjectPriorityField: def _run_graphql(query: str, variables: dict[str, Any] | None = None) -> dict[str, Any] | None: - """Execute a GraphQL query via ``gh api graphql`` and return parsed JSON. - - When ``GH_PROJECT_ONLY_TOKEN`` is set in the environment the GraphQL call is made - with that token instead of the default ``GH_TOKEN``. This allows cross-org - project access while the rest of the pipeline continues to use the scoped - ``github.token``. - """ + """Execute a GraphQL query via ``gh api graphql`` and return parsed JSON.""" args = ["api", "graphql", "-f", f"query={query}"] for k, v in (variables or {}).items(): args += ["-F", f"{k}={v}"] - env: Mapping[str, str] | None = None - project_token = os.environ.get("GH_PROJECT_ONLY_TOKEN", "") - if project_token: - env = {**os.environ, "GH_TOKEN": project_token} - res = run_gh(args, env=env) + res = run_gh(args) if res.returncode != 0: logging.warning(f"GraphQL call failed: {res.stderr}") return None diff --git a/src/security/constants.py b/src/security/constants.py index ed32aed..ae85d38 100644 --- a/src/security/constants.py +++ b/src/security/constants.py @@ -21,9 +21,6 @@ LABEL_EPIC = "epic" LABEL_SEC_ADEPT_TO_CLOSE = "sec:adept-to-close" -SEC_EVENT_OPEN = "open" -SEC_EVENT_REOPEN = "reopen" - SECMETA_TYPE_PARENT = "parent" SECMETA_TYPE_CHILD = "child" diff --git a/src/security/issues/sec_events.py b/src/security/issues/sec_events.py deleted file mode 100644 index 54625f4..0000000 --- a/src/security/issues/sec_events.py +++ /dev/null @@ -1,72 +0,0 @@ -# -# Copyright 2026 ABSA Group Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""``[sec-event]`` comment blocks – parsing, rendering, and stripping -structured lifecycle-event blocks from issue bodies. -""" - -import re - -from security.issues.secmeta import render_kv_lines - - -def parse_sec_event_fields(raw: str) -> dict[str, str]: - """Parse ``key=value`` lines from a raw sec-event block.""" - fields: dict[str, str] = {} - for line in (raw or "").splitlines(): - s = line.strip() - if not s or "=" not in s: - continue - k, v = s.split("=", 1) - fields[k.strip()] = v.strip() - return fields - - -def render_sec_event(fields: dict[str, str]) -> str: - """Render a structured ``[sec-event]`` comment block from *fields*.""" - preferred_order = [ - "action", - "seen_at", - "source", - "gh_alert_number", - "occurrence_fp", - "commit_sha", - "path", - "start_line", - "end_line", - ] - lines = ["[sec-event]"] - lines.extend(render_kv_lines(fields, preferred_order, skip_empty=True)) - lines.append("[/sec-event]") - return "\n".join(lines) + "\n" - - -def strip_sec_events_from_body(body: str) -> str: - """Remove any legacy sec-event content from an issue body. - - - Drops a dedicated '## Security Events' section if present (from previous versions). - - Removes any inline [sec-event] blocks. - """ - - text = body or "" - # Drop everything from the header onward (the section was intended to be last). - m = re.search(r"\n##\s+Security\s+Events\s*\n", text, flags=re.IGNORECASE) - if m: - text = text[: m.start()].rstrip() + "\n" - # Remove any inline blocks. - text = re.compile(r"\[sec-event\]\s*(.*?)\s*\[/sec-event\]", re.S).sub("", text) - text = re.sub(r"\n{3,}", "\n\n", text).strip() + "\n" - return text diff --git a/src/security/issues/sync.py b/src/security/issues/sync.py index f414f0f..c0d076c 100644 --- a/src/security/issues/sync.py +++ b/src/security/issues/sync.py @@ -24,11 +24,10 @@ import logging -from core.helpers import iso_date, normalize_path, utc_today +from core.helpers import iso_date, normalize_path from core.github.issues import ( gh_issue_add_labels, gh_issue_add_sub_issue_by_number, - gh_issue_comment, gh_issue_create, gh_issue_edit_body, gh_issue_edit_state, @@ -47,8 +46,6 @@ LABEL_SEC_ADEPT_TO_CLOSE, LABEL_TYPE_TECH_DEBT, NOT_AVAILABLE, - SEC_EVENT_OPEN, - SEC_EVENT_REOPEN, SECMETA_TYPE_CHILD, SECMETA_TYPE_PARENT, ) @@ -61,7 +58,6 @@ classify_category, ) from .models import AlertContext, IssueIndex, NotifiedIssue, SeverityChange, SyncContext, SyncResult -from .sec_events import render_sec_event, strip_sec_events_from_body from .secmeta import json_list, load_secmeta, parse_json_list, render_secmeta from .templates import PARENT_BODY_TEMPLATE @@ -127,30 +123,11 @@ def maybe_reopen_parent_issue( context, child_issue_number or "", ) - logging.info( - "DRY-RUN: would comment parent reopen sec-event on issue #%d (rule_id=%s)", - parent_issue.number, - rule_id, - ) parent_issue.state = "open" return if gh_issue_edit_state(repo, parent_issue.number, "open"): parent_issue.state = "open" - gh_issue_comment( - repo, - parent_issue.number, - render_sec_event( - { - "action": SEC_EVENT_REOPEN, - "seen_at": utc_today(), - "source": "code_scanning", - "rule_id": rule_id, - "context": context, - "child_issue": str(child_issue_number) if child_issue_number else "", - } - ), - ) def _close_resolved_parent_issues( @@ -288,7 +265,6 @@ def ensure_parent_issue( ).strip() + "\n" ) - rebuilt = strip_sec_events_from_body(rebuilt) # Snapshot the original body on first encounter so we can # defer the API call until all alerts have been processed. @@ -330,24 +306,6 @@ def ensure_parent_issue( if num is None: return None - # Parent lifecycle event (human visible): opened/created. - if dry_run: - logging.info("DRY-RUN: would comment parent open sec-event on issue #%d (rule_id=%s)", num, rule_id) - else: - gh_issue_comment( - repo_full, - num, - render_sec_event( - { - "action": SEC_EVENT_OPEN, - "seen_at": iso_date(alert.metadata.created_at), - "source": "code_scanning", - "rule_id": rule_id, - "severity": alert.metadata.severity, - } - ), - ) - created = Issue(number=num, state="open", title=title, body=body) issues[num] = created index.parent_by_rule_id[rule_id] = created @@ -504,24 +462,6 @@ def _handle_new_child_issue( logging.info("Add sub-issue link parent=#%d child=#%d (alert %d)", parent_issue.number, num, ctx.alert_number) gh_issue_add_sub_issue_by_number(ctx.repo, parent_issue.number, num) - gh_issue_comment( - ctx.repo, - num, - render_sec_event( - { - "action": SEC_EVENT_OPEN, - "seen_at": ctx.first_seen, - "source": "code_scanning", - "gh_alert_number": str(ctx.alert_number), - "occurrence_fp": str(ctx.occurrence_fp), - "commit_sha": str(ctx.commit_sha), - "path": str(ctx.path), - "start_line": str(ctx.start_line or ""), - "end_line": str(ctx.end_line or ""), - } - ), - ) - if sync.priority_sync is not None: sync.priority_sync.enqueue(ctx.repo, num, ctx.severity, sync.severity_priority_map) @@ -638,7 +578,6 @@ def _rebuild_and_apply_child_body( """Render a fresh child body from *secmeta* + template and apply if changed.""" human_body = build_child_issue_body(ctx.alert) new_body = render_secmeta(secmeta) + "\n\n" + human_body - new_body = strip_sec_events_from_body(new_body) if new_body != issue.body: if sync.dry_run: @@ -652,32 +591,6 @@ def _rebuild_and_apply_child_body( issue.body = new_body -def _comment_child_event( - *, - ctx: AlertContext, - sync: SyncContext, - issue: Issue, - reopened: bool, -) -> None: - """Post a reopen sec-event comment on the child issue.""" - if reopened: - if sync.dry_run: - logging.info("DRY-RUN: would comment reopen event on issue #%d (alert %d)", issue.number, ctx.alert_number) - else: - gh_issue_comment( - ctx.repo, - issue.number, - render_sec_event( - { - "action": SEC_EVENT_REOPEN, - "seen_at": utc_today(), - "source": "code_scanning", - "gh_alert_number": str(ctx.alert_number), - } - ), - ) - - def _sync_child_title_and_labels( *, ctx: AlertContext, @@ -758,10 +671,9 @@ def _handle_existing_child_issue( if parent_issue is None and ctx.rule_id: parent_issue = find_parent_issue(sync.index, rule_id=ctx.rule_id) - reopened = _maybe_reopen_child(ctx=ctx, sync=sync, issue=issue, parent_issue=parent_issue) + _maybe_reopen_child(ctx=ctx, sync=sync, issue=issue, parent_issue=parent_issue) secmeta, _ = _merge_child_secmeta(ctx=ctx, issue=issue) _rebuild_and_apply_child_body(ctx=ctx, sync=sync, issue=issue, secmeta=secmeta) - _comment_child_event(ctx=ctx, sync=sync, issue=issue, reopened=reopened) _sync_child_title_and_labels(ctx=ctx, sync=sync, issue=issue) if parent_issue is not None: diff --git a/tests/security/issues/test_sec_events.py b/tests/security/issues/test_sec_events.py deleted file mode 100644 index 849bc29..0000000 --- a/tests/security/issues/test_sec_events.py +++ /dev/null @@ -1,163 +0,0 @@ -# -# Copyright 2026 ABSA Group Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Unit tests for ``utils.sec_events``.""" - -import pytest - -from security.issues.sec_events import ( - parse_sec_event_fields, - render_sec_event, - strip_sec_events_from_body, -) - - -# ===================================================================== -# parse_sec_event_fields -# ===================================================================== - - -def test_basic_kv() -> None: - raw = "action=created\nseen_at=2026-01-01\nsource=aquasec" - result = parse_sec_event_fields(raw) - assert result == { - "action": "created", - "seen_at": "2026-01-01", - "source": "aquasec", - } - -def test_ignores_blank_and_no_equals() -> None: - raw = "action=created\n\njust-text\nfoo=bar" - result = parse_sec_event_fields(raw) - assert result == {"action": "created", "foo": "bar"} - -def test_equals_in_value() -> None: - raw = "path=a=b" - result = parse_sec_event_fields(raw) - assert result == {"path": "a=b"} - -def test_empty_string() -> None: - assert parse_sec_event_fields("") == {} - -def test_none_input() -> None: - assert parse_sec_event_fields(None) == {} - -def test_strips_whitespace() -> None: - raw = " action = created \n seen_at = 2026-01-01 " - result = parse_sec_event_fields(raw) - assert result == {"action": "created", "seen_at": "2026-01-01"} - - -# ===================================================================== -# render_sec_event -# ===================================================================== - - -def test_renders_fields_in_preferred_order() -> None: - fields = { - "commit_sha": "abc123", - "action": "created", - "seen_at": "2026-01-01", - } - rendered = render_sec_event(fields) - lines = rendered.strip().splitlines() - assert lines[0] == "[sec-event]" - assert lines[-1] == "[/sec-event]" - # action should come before commit_sha (preferred order) - assert lines.index("action=created") < lines.index("commit_sha=abc123") - -def test_includes_non_preferred_keys_sorted() -> None: - fields = {"action": "created", "z_custom": "1", "a_extra": "2"} - rendered = render_sec_event(fields) - # extra keys sorted alphabetically after preferred - assert "a_extra=2" in rendered - assert "z_custom=1" in rendered - lines = rendered.strip().splitlines() - idx_a = lines.index("a_extra=2") - idx_z = lines.index("z_custom=1") - assert idx_a < idx_z - -def test_skips_blank_values() -> None: - fields = {"action": "created", "path": " ", "source": ""} - rendered = render_sec_event(fields) - assert "path=" not in rendered - assert "source=" not in rendered - assert "action=created" in rendered - -def test_roundtrip() -> None: - fields = { - "action": "created", - "seen_at": "2026-01-01", - "source": "aquasec", - "gh_alert_number": "42", - "occurrence_fp": "fp123", - } - rendered = render_sec_event(fields) - # Extract inner content (skip opening/closing tags) - inner = "\n".join(rendered.strip().splitlines()[1:-1]) - parsed = parse_sec_event_fields(inner) - assert parsed == fields - - -# ===================================================================== -# strip_sec_events_from_body -# ===================================================================== - - -def test_removes_inline_block() -> None: - body = "Some text\n[sec-event]\naction=created\n[/sec-event]\nMore text" - result = strip_sec_events_from_body(body) - assert "[sec-event]" not in result - assert "Some text" in result - assert "More text" in result - -def test_removes_section_header() -> None: - body = "Intro\n\n## Security Events\nold stuff\n" - result = strip_sec_events_from_body(body) - assert "## Security Events" not in result - assert "Intro" in result - -def test_empty_body() -> None: - result = strip_sec_events_from_body("") - assert result.strip() == "" - -def test_none_body() -> None: - result = strip_sec_events_from_body(None) - assert result.strip() == "" - -def test_no_events() -> None: - body = "Just regular body text\n" - result = strip_sec_events_from_body(body) - assert "Just regular body text" in result - -def test_multiple_inline_blocks() -> None: - body = ( - "Text\n" - "[sec-event]\naction=created\n[/sec-event]\n" - "Middle\n" - "[sec-event]\naction=reopened\n[/sec-event]\n" - "End\n" - ) - result = strip_sec_events_from_body(body) - assert "[sec-event]" not in result - assert "Text" in result - assert "Middle" in result - assert "End" in result - -def test_collapses_excessive_newlines() -> None: - body = "A\n\n\n\n\nB\n" - result = strip_sec_events_from_body(body) - assert "\n\n\n" not in result diff --git a/tests/security/issues/test_sync.py b/tests/security/issues/test_sync.py index bd40352..f039c7a 100644 --- a/tests/security/issues/test_sync.py +++ b/tests/security/issues/test_sync.py @@ -27,7 +27,6 @@ from security.issues.sync import ( _append_notification, _close_resolved_parent_issues, - _comment_child_event, _ensure_child_linked_to_parent, _flush_parent_body_updates, _handle_existing_child_issue, @@ -250,19 +249,14 @@ def test_reopen_parent_dry_run() -> None: assert parent.state == "open" def test_reopen_parent_real(mocker: MockerFixture) -> None: - """Non-dry-run reopens issue and posts sec-event comment.""" + """Non-dry-run reopens issue via state edit.""" mock_edit_state = mocker.patch("security.issues.sync.gh_issue_edit_state", return_value=True) - mock_comment = mocker.patch("security.issues.sync.gh_issue_comment") parent = Issue(number=1, state="closed", title="P", body="b") maybe_reopen_parent_issue( "org/repo", parent, rule_id="R1", dry_run=False, context="reopen_child", child_issue_number=5, ) assert parent.state == "open" mock_edit_state.assert_called_once_with("org/repo", 1, "open") - mock_comment.assert_called_once() - comment_body = mock_comment.call_args[0][2] - assert "reopen" in comment_body - assert "R1" in comment_body def test_reopen_parent_gh_failure(mocker: MockerFixture) -> None: """If gh_issue_edit_state fails, state stays closed.""" @@ -444,12 +438,10 @@ def test_rebuild_body_changed(mocker: MockerFixture, sast_alert: Alert) -> None: def test_rebuild_body_unchanged(sast_alert: Alert) -> None: """When body is identical, no API call is made.""" from security.issues.builder import build_child_issue_body - from security.issues.sec_events import strip_sec_events_from_body secmeta = {"schema": "1", "type": "child", "fingerprint": "fp1"} human_body = build_child_issue_body(sast_alert) body = render_secmeta(secmeta) + "\n\n" + human_body - body = strip_sec_events_from_body(body) issue = Issue(number=1, state="open", title="T", body=body) ctx = _make_alert_context(alert=sast_alert) sync = _make_sync_context() @@ -464,54 +456,6 @@ def test_rebuild_body_dry_run(sast_alert: Alert) -> None: _rebuild_and_apply_child_body(ctx=ctx, sync=sync, issue=issue, secmeta=secmeta) -# ===================================================================== -# _comment_child_event -# ===================================================================== - - -def test_comment_reopen_event(mocker: MockerFixture) -> None: - """Posts a reopen sec-event comment when reopened=True.""" - mock_comment = mocker.patch("security.issues.sync.gh_issue_comment") - issue = Issue(number=1, state="open", title="T", body="b") - ctx = _make_alert_context() - sync = _make_sync_context() - _comment_child_event(ctx=ctx, sync=sync, issue=issue, reopened=True) - mock_comment.assert_called_once() - comment_body = mock_comment.call_args[0][2] - assert "reopen" in comment_body - -def test_comment_occurrence_event_no_comment(mocker: MockerFixture) -> None: - """No sec-event comment when issue is already open (new_occurrence=True but reopened=False).""" - mock_comment = mocker.patch("security.issues.sync.gh_issue_comment") - issue = Issue(number=1, state="open", title="T", body="b") - ctx = _make_alert_context() - sync = _make_sync_context() - _comment_child_event(ctx=ctx, sync=sync, issue=issue, reopened=False) - mock_comment.assert_not_called() - -def test_comment_no_event() -> None: - """No comment when neither reopened nor new_occurrence.""" - issue = Issue(number=1, state="open", title="T", body="b") - ctx = _make_alert_context() - sync = _make_sync_context() - _comment_child_event(ctx=ctx, sync=sync, issue=issue, reopened=False) - -def test_comment_reopen_dry_run() -> None: - """Dry-run mode does not call gh_issue_comment for reopen.""" - issue = Issue(number=1, state="open", title="T", body="b") - ctx = _make_alert_context() - sync = _make_sync_context(dry_run=True) - _comment_child_event(ctx=ctx, sync=sync, issue=issue, reopened=True) - -def test_comment_occurrence_dry_run() -> None: - """No comment in any mode when issue is already open (occurrence-only path).""" - issue = Issue(number=1, state="open", title="T", body="b") - ctx = _make_alert_context() - sync = _make_sync_context(dry_run=True) - # Dry-run should also be silent for already-open issues. - _comment_child_event(ctx=ctx, sync=sync, issue=issue, reopened=False) - - # ===================================================================== # _sync_child_title_and_labels # ===================================================================== @@ -580,7 +524,6 @@ def test_handle_new_child_links_to_parent(mocker: MockerFixture, sast_alert: Ale """When a parent issue exists, the child is linked as a sub-issue.""" mocker.patch("security.issues.sync.gh_issue_create", return_value=42) mock_sub = mocker.patch("security.issues.sync.gh_issue_add_sub_issue_by_number") - mocker.patch("security.issues.sync.gh_issue_comment") parent = Issue(number=1, state="open", title="P", body="pb") ctx = _make_alert_context(alert=sast_alert) sync = _make_sync_context(notifications=[]) @@ -672,7 +615,6 @@ def test_ensure_child_linked_api_failure_no_cache_update(mocker: MockerFixture) def test_ensure_parent_creates_new(mocker: MockerFixture, sast_alert: Alert) -> None: """Creates a parent issue when none exists for the rule_id.""" mock_create = mocker.patch("security.issues.sync.gh_issue_create", return_value=99) - mocker.patch("security.issues.sync.gh_issue_comment") issues: dict[int, Issue] = {} index = IssueIndex(by_fingerprint={}, parent_by_rule_id={}) result = ensure_parent_issue(sast_alert, issues, index, dry_run=False) diff --git a/tests/security/test_constants.py b/tests/security/test_constants.py index 22b5fbd..e189468 100644 --- a/tests/security/test_constants.py +++ b/tests/security/test_constants.py @@ -21,8 +21,6 @@ LABEL_SCOPE_SECURITY, LABEL_SEC_ADEPT_TO_CLOSE, LABEL_TYPE_TECH_DEBT, - SEC_EVENT_OPEN, - SEC_EVENT_REOPEN, SECMETA_TYPE_CHILD, SECMETA_TYPE_PARENT, ) From 4b3a50dc98db824224e9e9d2b3af0eff61ca7e54 Mon Sep 17 00:00:00 2001 From: "Tobias.Mikula" Date: Wed, 22 Apr 2026 12:15:25 +0200 Subject: [PATCH 03/13] Removing daily updating logic and setting correct issue title. --- src/security/alerts/models.py | 2 + src/security/collect_alert.py | 1 + src/security/issues/builder.py | 22 ++++----- src/security/issues/models.py | 4 +- src/security/issues/secmeta.py | 14 +----- src/security/issues/sync.py | 85 ++++++-------------------------- src/security/issues/templates.py | 1 - tests/security/conftest.py | 3 ++ 8 files changed, 33 insertions(+), 99 deletions(-) diff --git a/src/security/alerts/models.py b/src/security/alerts/models.py index 9c56c93..b1a074f 100644 --- a/src/security/alerts/models.py +++ b/src/security/alerts/models.py @@ -34,6 +34,7 @@ class AlertMetadata: alert_url: str = "" rule_id: str = "" rule_name: str = "" + rule_description: str = "" severity: str = "" confidence: str = "" tags: list[str] = field(default_factory=list) @@ -51,6 +52,7 @@ class AlertMetadata: def __post_init__(self) -> None: self.rule_id = (self.rule_id or "").strip() self.rule_name = (self.rule_name or "").strip() + self.rule_description = (self.rule_description or "").strip() self.severity = (self.severity or "").strip() or "unknown" self.state = (self.state or "").lower().strip() self.tool = (self.tool or "").strip() diff --git a/src/security/collect_alert.py b/src/security/collect_alert.py index 6590378..110fec3 100644 --- a/src/security/collect_alert.py +++ b/src/security/collect_alert.py @@ -127,6 +127,7 @@ def _normalise_alert(alert: dict) -> dict: "alert_url": alert.get("html_url"), "rule_id": rule.get("id"), "rule_name": rule.get("name"), + "rule_description": rule.get("description"), "severity": rule.get("security_severity_level"), "confidence": rule.get("severity"), "tags": rule.get("tags") or [], diff --git a/src/security/issues/builder.py b/src/security/issues/builder.py index a2c4cea..fed75c9 100644 --- a/src/security/issues/builder.py +++ b/src/security/issues/builder.py @@ -87,7 +87,7 @@ def build_parent_template_values(alert: Alert, *, rule_id: str, severity: str) - return { "category": alert.metadata.rule_name or NOT_AVAILABLE, "avd_id": alert.alert_details.vulnerability or rule_id, - "title": rule_id, + "title": alert.metadata.rule_description or rule_id, "severity": severity, "published_date": iso_date(alert.rule_details.published_date or NOT_AVAILABLE), "package_name": alert.rule_details.package_name, @@ -103,16 +103,10 @@ def build_parent_issue_body(alert: Alert) -> str: repo_full = alert.repo secmeta: dict[str, str] = { - "schema": "1", "type": SECMETA_TYPE_PARENT, "repo": repo_full, - "source": "code_scanning", - "tool": alert.metadata.tool, - "severity": severity, "rule_id": rule_id, - "first_seen": iso_date(alert.metadata.created_at), - "last_seen": iso_date(alert.metadata.updated_at), - "postponed_until": "", + "severity": severity, } values = build_parent_template_values(alert, rule_id=rule_id, severity=severity) @@ -120,10 +114,15 @@ def build_parent_issue_body(alert: Alert) -> str: return render_secmeta(secmeta) + "\n\n" + human_body -def build_issue_title(rule_name: str | None, rule_id: str, fingerprint: str) -> str: +def build_issue_title( + rule_description: str | None, + rule_name: str | None, + rule_id: str, + fingerprint: str, +) -> str: """Build the title string for a child issue.""" prefix = fingerprint[:8] if fingerprint else NOT_AVAILABLE - summary = (rule_name or rule_id or "Security finding").strip() or "Security finding" + summary = (rule_description or rule_name or rule_id or "Security finding").strip() return f"[SEC][FP={prefix}] {summary}" @@ -136,7 +135,7 @@ def build_child_issue_body(alert: Alert) -> str: vulnerability = alert.alert_details.vulnerability avd_id = vulnerability if vulnerability.startswith("AVD-") else NOT_AVAILABLE - title = alert.metadata.rule_id + title = alert.metadata.rule_description or alert.metadata.rule_id scm_file = alert.alert_details.scm_file start_line = alert.metadata.start_line @@ -169,7 +168,6 @@ def build_child_issue_body(alert: Alert) -> str: "installed_version": alert.alert_details.installed_version, "fixed_version": alert.rule_details.fixed_version, "reachable": alert.alert_details.reachable, - "scan_date": iso_date(alert.alert_details.scan_date or alert.metadata.updated_at or NOT_AVAILABLE), "first_seen": iso_date(alert.alert_details.first_seen or alert.metadata.created_at or NOT_AVAILABLE), } return render_markdown_template(CHILD_BODY_TEMPLATE, values).strip() + "\n" diff --git a/src/security/issues/models.py b/src/security/issues/models.py index d2c2d63..3bdaa9b 100644 --- a/src/security/issues/models.py +++ b/src/security/issues/models.py @@ -91,13 +91,11 @@ class AlertContext: alert: Alert alert_number: int fingerprint: str - occurrence_fp: str repo: str - first_seen: str - last_seen: str tool: str rule_id: str rule_name: str + rule_description: str severity: str cve: str path: str diff --git a/src/security/issues/secmeta.py b/src/security/issues/secmeta.py index fc1342b..f816ecc 100644 --- a/src/security/issues/secmeta.py +++ b/src/security/issues/secmeta.py @@ -79,22 +79,12 @@ def render_kv_lines( def render_secmeta(secmeta: dict[str, str]) -> str: """Render a secmeta dict as a hidden HTML-comment block for issue bodies.""" preferred_order = [ - "schema", + "type", "fingerprint", "repo", - "source", - "tool", - "severity", - "cve", - "category", "rule_id", - "first_seen", - "last_seen", - "last_seen_commit", - "postponed_until", + "severity", "gh_alert_numbers", - "occurrence_count", - "last_occurrence_fp", ] lines = render_kv_lines(secmeta, preferred_order) return "" diff --git a/src/security/issues/sync.py b/src/security/issues/sync.py index c0d076c..8d471ad 100644 --- a/src/security/issues/sync.py +++ b/src/security/issues/sync.py @@ -24,7 +24,7 @@ import logging -from core.helpers import iso_date, normalize_path +from core.helpers import normalize_path from core.github.issues import ( gh_issue_add_labels, gh_issue_add_sub_issue_by_number, @@ -39,7 +39,6 @@ from core.rendering import render_markdown_template from security.alerts.models import Alert -from security.alerts.parser import compute_occurrence_fp from security.constants import ( LABEL_EPIC, LABEL_SCOPE_SECURITY, @@ -210,10 +209,6 @@ def ensure_parent_issue( if existing is not None: # Keep parent issues aligned to the template as alerts evolve. existing_secmeta = load_secmeta(existing.body) or {"schema": "1"} - existing_first = existing_secmeta.get("first_seen") or iso_date(alert.metadata.created_at) - existing_last = existing_secmeta.get("last_seen") or iso_date(alert.metadata.updated_at) - first_seen_final = min(existing_first, iso_date(alert.metadata.created_at)) - last_seen_final = max(existing_last, iso_date(alert.metadata.updated_at)) existing_severity = str(existing_secmeta.get("severity") or "unknown") existing_severity_cmp = existing_severity.lower() @@ -230,7 +225,7 @@ def ensure_parent_issue( ) if dry_run: logging.info( - "DRY-RUN: severity change on parent #%d (rule_id=%s): %s \u2192 %s", + "DRY-RUN: severity change on parent #%d (rule_id=%s): %s - %s", existing.number, rule_id, existing_severity_cmp, @@ -243,16 +238,10 @@ def ensure_parent_issue( existing_secmeta.update( { - "schema": existing_secmeta.get("schema") or "1", "type": SECMETA_TYPE_PARENT, "repo": repo_full, - "source": existing_secmeta.get("source") or "code_scanning", - "tool": alert.metadata.tool or existing_secmeta.get("tool") or "", "severity": severity_stored, "rule_id": rule_id, - "first_seen": first_seen_final, - "last_seen": last_seen_final, - "postponed_until": existing_secmeta.get("postponed_until", ""), } ) @@ -355,29 +344,17 @@ def _handle_new_child_issue( """Create a new child issue for an alert that has no matching issue yet.""" category = classify_category(ctx.alert) secmeta: dict[str, str] = { - "schema": "1", "type": SECMETA_TYPE_CHILD, "fingerprint": ctx.fingerprint, "repo": ctx.repo, - "source": "code_scanning", - "tool": ctx.tool, - "severity": ctx.severity, - "category": category, "rule_id": ctx.rule_id, - "first_seen": ctx.first_seen, - "last_seen": ctx.last_seen, - "last_seen_commit": ctx.commit_sha, - "postponed_until": "", + "severity": ctx.severity, "gh_alert_numbers": json_list([str(ctx.alert_number)]), - "occurrence_count": "1", - "last_occurrence_fp": ctx.occurrence_fp, } - if ctx.cve: - secmeta["cve"] = ctx.cve human_body = build_child_issue_body(ctx.alert) body = render_secmeta(secmeta) + "\n\n" + human_body - title = build_issue_title(ctx.rule_name, ctx.rule_id, ctx.fingerprint) + title = build_issue_title(ctx.rule_description, ctx.rule_name, ctx.rule_id, ctx.fingerprint) if sync.dry_run: labels = [LABEL_SCOPE_SECURITY, LABEL_TYPE_TECH_DEBT] @@ -386,8 +363,7 @@ def _handle_new_child_issue( commit_short = ctx.commit_sha[:8] if ctx.commit_sha else "" logging.info( "DRY-RUN: create child alert=%d rule_id=%s sev=%s" - " fp=%s tool=%s commit=%s loc=%s title=%r labels=[%s]" - " | secmeta:first_seen=%s last_seen=%s occurrence_count=1 gh_alert_numbers=[%d]", + " fp=%s tool=%s commit=%s loc=%s title=%r labels=[%s] gh_alert_numbers=[%d]", ctx.alert_number, ctx.rule_id, ctx.severity, @@ -397,8 +373,6 @@ def _handle_new_child_issue( loc, title, ",".join(labels), - ctx.first_seen, - ctx.last_seen, ctx.alert_number, ) if parent_issue is None and ctx.rule_id: @@ -518,12 +492,9 @@ def _merge_child_secmeta( *, ctx: AlertContext, issue: Issue, -) -> tuple[dict[str, str], bool]: - """Merge incoming alert data into the child issue's secmeta. - - Returns ``(updated_secmeta, new_occurrence)``. - """ - secmeta = load_secmeta(issue.body) or {"schema": "1"} +) -> dict[str, str]: + """Merge incoming alert data into the child issue's secmeta.""" + secmeta = load_secmeta(issue.body) or {} secmeta.pop("alert_hash", None) existing_alerts = parse_json_list(secmeta.get("gh_alert_numbers")) @@ -532,40 +503,18 @@ def _merge_child_secmeta( if str(ctx.alert_number) not in existing_alerts: existing_alerts.append(str(ctx.alert_number)) - last_occ_fp = secmeta.get("last_occurrence_fp", "") - occurrence_count = int(secmeta.get("occurrence_count") or "0" or 0) - new_occurrence = bool(ctx.occurrence_fp and ctx.occurrence_fp != last_occ_fp) - if occurrence_count <= 0: - occurrence_count = 1 - if new_occurrence: - occurrence_count += 1 - - existing_first = secmeta.get("first_seen") or ctx.first_seen - existing_last = secmeta.get("last_seen") or ctx.last_seen - first_seen_final = min(existing_first, ctx.first_seen) - last_seen_final = max(existing_last, ctx.last_seen) - secmeta.update( { + "type": SECMETA_TYPE_CHILD, "fingerprint": ctx.fingerprint, "repo": ctx.repo, - "source": secmeta.get("source") or "code_scanning", - "tool": ctx.tool or secmeta.get("tool", ""), - "severity": ctx.severity, - "category": classify_category(ctx.alert) or secmeta.get("category", ""), "rule_id": ctx.rule_id or secmeta.get("rule_id", ""), - "first_seen": first_seen_final, - "last_seen": last_seen_final, - "last_seen_commit": ctx.commit_sha or secmeta.get("last_seen_commit", ""), + "severity": ctx.severity, "gh_alert_numbers": json_list(existing_alerts), - "occurrence_count": str(occurrence_count), - "last_occurrence_fp": ctx.occurrence_fp or last_occ_fp, } ) - if ctx.cve: - secmeta["cve"] = ctx.cve - return secmeta, new_occurrence + return secmeta def _rebuild_and_apply_child_body( @@ -598,7 +547,7 @@ def _sync_child_title_and_labels( issue: Issue, ) -> None: """Fix title drift and ensure required labels and priority on the child issue.""" - expected_title = build_issue_title(ctx.rule_name, ctx.rule_id, ctx.fingerprint) + expected_title = build_issue_title(ctx.rule_description, ctx.rule_name, ctx.rule_id, ctx.fingerprint) if expected_title != (issue.title or ""): if sync.dry_run: logging.info( @@ -672,7 +621,7 @@ def _handle_existing_child_issue( parent_issue = find_parent_issue(sync.index, rule_id=ctx.rule_id) _maybe_reopen_child(ctx=ctx, sync=sync, issue=issue, parent_issue=parent_issue) - secmeta, _ = _merge_child_secmeta(ctx=ctx, issue=issue) + secmeta = _merge_child_secmeta(ctx=ctx, issue=issue) _rebuild_and_apply_child_body(ctx=ctx, sync=sync, issue=issue, secmeta=secmeta) _sync_child_title_and_labels(ctx=ctx, sync=sync, issue=issue) @@ -713,11 +662,7 @@ def ensure_issue( "Ensure the collector/scanner includes an 'Alert hash: ...' line." ) - occurrence_fp = compute_occurrence_fp(commit_sha, path, start_line, end_line) - repo_full = alert.repo - first_seen = iso_date(alert.metadata.created_at) - last_seen = iso_date(alert.metadata.updated_at) parent_issue = ensure_parent_issue( alert, @@ -738,13 +683,11 @@ def ensure_issue( alert=alert, alert_number=alert_number, fingerprint=fingerprint, - occurrence_fp=occurrence_fp, repo=repo_full, - first_seen=first_seen, - last_seen=last_seen, tool=alert.metadata.tool, rule_id=rule_id, rule_name=alert.metadata.rule_name, + rule_description=alert.metadata.rule_description, severity=alert.metadata.severity, cve=cve, path=path, diff --git a/src/security/issues/templates.py b/src/security/issues/templates.py index f90aa2d..d2d7ede 100644 --- a/src/security/issues/templates.py +++ b/src/security/issues/templates.py @@ -81,6 +81,5 @@ ## Detection Timeline -- **Scan date:** {{ scan_date }} - **First seen:** {{ first_seen }} """ diff --git a/tests/security/conftest.py b/tests/security/conftest.py index a475954..7edd1aa 100644 --- a/tests/security/conftest.py +++ b/tests/security/conftest.py @@ -38,6 +38,7 @@ "alert_url": "https://github.com/test-org/test-repo/security/code-scanning/303", "rule_id": "req-with-very-false-aquasec-python", "rule_name": "sast", + "rule_description": "Requests with verify=False", "severity": "high", "confidence": "error", "tags": ["HIGH", "sast", "security"], @@ -108,6 +109,7 @@ "alert_url": "https://github.com/test-org/test-repo/security/code-scanning/312", "rule_id": "CVE-2026-25755", "rule_name": "vulnerabilities", + "rule_description": "jsPDF PDF object injection", "severity": "high", "confidence": "error", "tags": ["HIGH", "security", "vulnerabilities"], @@ -179,6 +181,7 @@ "alert_url": "https://github.com/test-org/test-repo/security/code-scanning/317", "rule_id": "AVD-PIPELINE-0008", "rule_name": "pipelineMisconfigurations", + "rule_description": "Dependency not pinned to commit SHA", "severity": "medium", "confidence": "warning", "tags": ["MEDIUM", "pipelineMisconfigurations", "security"], From 876493bc44a457127414be95a809d83783420856 Mon Sep 17 00:00:00 2001 From: "Tobias.Mikula" Date: Wed, 22 Apr 2026 12:17:54 +0200 Subject: [PATCH 04/13] Adding tests for the last commit. --- src/core/github/client.py | 1 + src/core/github/projects.py | 1 - src/security/issues/sync.py | 2 +- tests/security/alerts/test_models.py | 2 + tests/security/issues/test_builder.py | 43 +++----- tests/security/issues/test_models.py | 5 +- tests/security/issues/test_secmeta.py | 6 +- tests/security/issues/test_sync.py | 125 +++++++++++------------- tests/security/issues/test_templates.py | 3 +- tests/security/test_collect_alert.py | 3 + tests/security/test_constants.py | 7 -- 11 files changed, 84 insertions(+), 114 deletions(-) diff --git a/src/core/github/client.py b/src/core/github/client.py index 3213419..0227f61 100644 --- a/src/core/github/client.py +++ b/src/core/github/client.py @@ -19,6 +19,7 @@ import logging import subprocess + def run_cmd( cmd: list[str], *, diff --git a/src/core/github/projects.py b/src/core/github/projects.py index 4a623f3..6d73c9c 100644 --- a/src/core/github/projects.py +++ b/src/core/github/projects.py @@ -21,7 +21,6 @@ import json import logging - from dataclasses import dataclass from typing import Any diff --git a/src/security/issues/sync.py b/src/security/issues/sync.py index 8d471ad..62867d2 100644 --- a/src/security/issues/sync.py +++ b/src/security/issues/sync.py @@ -208,7 +208,7 @@ def ensure_parent_issue( existing = find_parent_issue(index, rule_id=rule_id) if existing is not None: # Keep parent issues aligned to the template as alerts evolve. - existing_secmeta = load_secmeta(existing.body) or {"schema": "1"} + existing_secmeta = load_secmeta(existing.body) or {} existing_severity = str(existing_secmeta.get("severity") or "unknown") existing_severity_cmp = existing_severity.lower() diff --git a/tests/security/alerts/test_models.py b/tests/security/alerts/test_models.py index 1de2f72..cfd55c3 100644 --- a/tests/security/alerts/test_models.py +++ b/tests/security/alerts/test_models.py @@ -30,12 +30,14 @@ def test_alert_metadata_none_fields_do_not_crash() -> None: severity=None, # type: ignore[arg-type] – mirrors _normalise_alert output rule_id=None, # type: ignore[arg-type] rule_name=None, # type: ignore[arg-type] + rule_description=None, # type: ignore[arg-type] state=None, # type: ignore[arg-type] tool=None, # type: ignore[arg-type] ) assert md.severity == "unknown" assert md.rule_id == "" assert md.rule_name == "" + assert md.rule_description == "" assert md.state == "" assert md.tool == "" diff --git a/tests/security/issues/test_builder.py b/tests/security/issues/test_builder.py index d182e8f..c350ab9 100644 --- a/tests/security/issues/test_builder.py +++ b/tests/security/issues/test_builder.py @@ -278,19 +278,23 @@ def test_contains_confidence(vuln_alert: Alert) -> None: def test_format() -> None: fp = "a1b2c3d4e5f6" - title = build_issue_title("sast", "rule-123", fp) - assert title == "[SEC][FP=a1b2c3d4] sast" + title = build_issue_title("A description", "sast", "rule-123", fp) + assert title == "[SEC][FP=a1b2c3d4] A description" + +def test_fallback_to_rule_name() -> None: + title = build_issue_title(None, "sast", "rule-123", "abcdef12") + assert "sast" in title def test_fallback_to_rule_id() -> None: - title = build_issue_title(None, "rule-123", "abcdef12") + title = build_issue_title(None, None, "rule-123", "abcdef12") assert "rule-123" in title def test_fallback_to_default() -> None: - title = build_issue_title(None, "", "abcdef12") + title = build_issue_title(None, None, "", "abcdef12") assert "Security finding" in title def test_empty_fingerprint() -> None: - title = build_issue_title("sast", "rule-123", "") + title = build_issue_title("A description", "sast", "rule-123", "") assert "N/A" in title @@ -303,7 +307,7 @@ def test_empty_fingerprint() -> None: def test_sast_avd_id(sast_alert: Alert) -> None: body = build_child_issue_body(sast_alert) - assert "req-with-very-false-aquasec-python" in body + assert "Requests with verify=False" in body def test_sast_alert_hash(sast_alert: Alert) -> None: body = build_child_issue_body(sast_alert) @@ -311,7 +315,7 @@ def test_sast_alert_hash(sast_alert: Alert) -> None: def test_sast_title(sast_alert: Alert) -> None: body = build_child_issue_body(sast_alert) - assert "sast" in body + assert "Requests with verify=False" in body def test_sast_message_present(sast_alert: Alert) -> None: body = build_child_issue_body(sast_alert) @@ -333,10 +337,6 @@ def test_sast_reachable_from_msg(sast_alert: Alert) -> None: body = build_child_issue_body(sast_alert) assert "False" in body -def test_sast_scan_date(sast_alert: Alert) -> None: - body = build_child_issue_body(sast_alert) - assert "2026-02-24" in body - def test_sast_first_seen(sast_alert: Alert) -> None: body = build_child_issue_body(sast_alert) assert "2025-09-17" in body @@ -345,7 +345,7 @@ def test_sast_first_seen(sast_alert: Alert) -> None: def test_vuln_avd_id(vuln_alert: Alert) -> None: body = build_child_issue_body(vuln_alert) - assert "CVE-2026-25755" in body + assert "jsPDF PDF object injection" in body def test_vuln_installed_version(vuln_alert: Alert) -> None: body = build_child_issue_body(vuln_alert) @@ -410,21 +410,6 @@ def test_all_template_sections_rendered(vuln_alert: Alert) -> None: assert "## Detection Timeline" in body -def test_scan_date_falls_back_to_metadata_updated_at() -> None: - """When alert_details.scan_date is absent, fall back to metadata.updated_at.""" - alert = Alert.from_dict({ - "metadata": { - "rule_id": "X", - "updated_at": "2026-01-15T10:00:00Z", - "created_at": "2025-12-01T08:00:00Z", - }, - "alert_details": {}, # scan_date absent → defaults to "" - "rule_details": {}, - }) - body = build_child_issue_body(alert) - assert "2026-01-15" in body - - def test_first_seen_falls_back_to_metadata_created_at() -> None: """When alert_details.first_seen is absent, fall back to metadata.created_at.""" alert = Alert.from_dict({ @@ -440,7 +425,7 @@ def test_first_seen_falls_back_to_metadata_created_at() -> None: assert "2025-12-01" in body -def test_scan_date_and_first_seen_yield_na_when_no_fallback() -> None: +def test_first_seen_yields_na_when_no_fallback() -> None: """When neither alert_details nor metadata provide dates, render N/A.""" alert = Alert.from_dict({ "metadata": {"rule_id": "X"}, # no updated_at / created_at @@ -448,4 +433,4 @@ def test_scan_date_and_first_seen_yield_na_when_no_fallback() -> None: "rule_details": {}, }) body = build_child_issue_body(alert) - assert body.count("N/A") >= 2 + assert body.count("N/A") >= 1 diff --git a/tests/security/issues/test_models.py b/tests/security/issues/test_models.py index a0ad502..9df3447 100644 --- a/tests/security/issues/test_models.py +++ b/tests/security/issues/test_models.py @@ -108,9 +108,10 @@ def test_issue_index_creation() -> None: def test_alert_context_creation() -> None: ctx = AlertContext( - alert={}, alert_number=1, fingerprint="fp", occurrence_fp="ofp", - repo="org/repo", first_seen="2026-01-01", last_seen="2026-01-02", + alert={}, alert_number=1, fingerprint="fp", + repo="org/repo", tool="AquaSec", rule_id="R1", rule_name="sast", + rule_description="Test finding description", severity="high", cve="CVE-79", path="src/f.py", start_line=10, end_line=20, commit_sha="abc123", ) diff --git a/tests/security/issues/test_secmeta.py b/tests/security/issues/test_secmeta.py index 8fc28cc..1ac9017 100644 --- a/tests/security/issues/test_secmeta.py +++ b/tests/security/issues/test_secmeta.py @@ -115,10 +115,10 @@ def test_preferred_order() -> None: } rendered = render_secmeta(data) lines = rendered.strip().split("\n") - # schema should appear before fingerprint - schema_idx = next(i for i, l in enumerate(lines) if "schema=" in l) + # type should appear before fingerprint per preferred_order + type_idx = next(i for i, l in enumerate(lines) if "type=" in l) fp_idx = next(i for i, l in enumerate(lines) if "fingerprint=" in l) - assert schema_idx < fp_idx + assert type_idx < fp_idx def test_secmeta_roundtrip() -> None: """Render then parse should recover the original data.""" diff --git a/tests/security/issues/test_sync.py b/tests/security/issues/test_sync.py index f039c7a..6bf9f0e 100644 --- a/tests/security/issues/test_sync.py +++ b/tests/security/issues/test_sync.py @@ -71,13 +71,11 @@ def _make_alert_context(**overrides: Any) -> AlertContext: alert=Alert(), alert_number=1, fingerprint="fp_test_123", - occurrence_fp="occ_fp_test", repo="test-org/test-repo", - first_seen="2026-01-01", - last_seen="2026-01-02", tool="AquaSec", rule_id="CVE-2026-1234", rule_name="sast", + rule_description="Test finding description", severity="high", cve="CVE-2026-1234", path="src/main.py", @@ -127,6 +125,7 @@ def test_indexes_parent_by_rule_id() -> None: assert "CVE-2026-1234" in idx.parent_by_rule_id assert idx.parent_by_rule_id["CVE-2026-1234"] is parent + def test_indexes_child_by_fingerprint() -> None: child = _issue_with_secmeta(2, { "type": "child", @@ -138,6 +137,7 @@ def test_indexes_child_by_fingerprint() -> None: assert "fp_abc123" in idx.by_fingerprint assert idx.by_fingerprint["fp_abc123"] is child + def test_parent_not_in_fingerprint_index() -> None: parent = _issue_with_secmeta(1, { "type": "parent", @@ -148,6 +148,7 @@ def test_parent_not_in_fingerprint_index() -> None: idx = build_issue_index(issues) assert "fp_parent" not in idx.by_fingerprint + def test_first_fingerprint_wins() -> None: """If two children share the same fingerprint, the first one wins.""" c1 = _issue_with_secmeta(10, {"type": "child", "fingerprint": "shared_fp"}) @@ -156,6 +157,7 @@ def test_first_fingerprint_wins() -> None: idx = build_issue_index(issues) assert idx.by_fingerprint["shared_fp"].number == 10 + def test_mixed_issues() -> None: parent = _issue_with_secmeta(1, {"type": "parent", "rule_id": "R1"}) child = _issue_with_secmeta(2, {"type": "child", "fingerprint": "fp1", "rule_id": "R1"}) @@ -164,11 +166,13 @@ def test_mixed_issues() -> None: assert "R1" in idx.parent_by_rule_id assert "fp1" in idx.by_fingerprint + def test_empty() -> None: idx = build_issue_index({}) assert idx.by_fingerprint == {} assert idx.parent_by_rule_id == {} + def test_alert_hash_fallback() -> None: """Fingerprint falls back to alert_hash when fingerprint is absent.""" child = _issue_with_secmeta(5, {"type": "child", "alert_hash": "hash_123"}) @@ -176,6 +180,7 @@ def test_alert_hash_fallback() -> None: idx = build_issue_index(issues) assert "hash_123" in idx.by_fingerprint + def test_no_secmeta_skipped() -> None: """Issue without secmeta block is silently skipped.""" issue = Issue(number=99, state="open", title="No meta", body="Plain body\n") @@ -197,6 +202,7 @@ def test_find_issue_found() -> None: assert result is not None assert result.number == 2 + def test_find_issue_not_found() -> None: idx = build_issue_index({}) assert find_issue_in_index(idx, fingerprint="missing") is None @@ -214,6 +220,7 @@ def test_find_parent_found() -> None: assert result is not None assert result.number == 1 + def test_find_parent_not_found() -> None: idx = build_issue_index({}) assert find_parent_issue(idx, rule_id="missing") is None @@ -232,6 +239,7 @@ def test_reopen_parent_none(mocker: MockerFixture) -> None: ) mock_edit.assert_not_called() + def test_reopen_parent_already_open() -> None: """No-op when parent is already open.""" parent = Issue(number=1, state="open", title="P", body="b") @@ -240,6 +248,7 @@ def test_reopen_parent_already_open() -> None: ) assert parent.state == "open" + def test_reopen_parent_dry_run() -> None: """In dry-run mode, sets state to open without calling gh.""" parent = Issue(number=1, state="closed", title="P", body="b") @@ -248,6 +257,7 @@ def test_reopen_parent_dry_run() -> None: ) assert parent.state == "open" + def test_reopen_parent_real(mocker: MockerFixture) -> None: """Non-dry-run reopens issue via state edit.""" mock_edit_state = mocker.patch("security.issues.sync.gh_issue_edit_state", return_value=True) @@ -258,6 +268,7 @@ def test_reopen_parent_real(mocker: MockerFixture) -> None: assert parent.state == "open" mock_edit_state.assert_called_once_with("org/repo", 1, "open") + def test_reopen_parent_gh_failure(mocker: MockerFixture) -> None: """If gh_issue_edit_state fails, state stays closed.""" mocker.patch("security.issues.sync.gh_issue_edit_state", return_value=False) @@ -282,6 +293,7 @@ def test_append_notification_active() -> None: assert len(notifications) == 1 assert notifications[0].issue_number == 42 + def test_append_notification_none() -> None: """No-op when notifications is None — must not raise.""" _append_notification( @@ -289,7 +301,7 @@ def test_append_notification_none() -> None: severity="high", category="sast", state="new", tool="AquaSec", ) # No assertion needed: the test passes if no exception is raised. - + # ===================================================================== # _merge_child_secmeta @@ -302,47 +314,12 @@ def test_merge_new_alert_number() -> None: "type": "child", "fingerprint": "fp1", "gh_alert_numbers": '["100"]', - "occurrence_count": "1", - "last_occurrence_fp": "old_occ", - "first_seen": "2026-01-01", - "last_seen": "2026-01-01", }) - ctx = _make_alert_context(alert_number=200, fingerprint="fp1", occurrence_fp="new_occ") - secmeta, new_occurrence = _merge_child_secmeta(ctx=ctx, issue=child) + ctx = _make_alert_context(alert_number=200, fingerprint="fp1") + secmeta = _merge_child_secmeta(ctx=ctx, issue=child) assert "200" in secmeta["gh_alert_numbers"] assert "100" in secmeta["gh_alert_numbers"] - assert new_occurrence is True - assert secmeta["occurrence_count"] == "2" -def test_merge_same_occurrence_fp() -> None: - """Same occurrence_fp means no new occurrence counted.""" - child = _issue_with_secmeta(1, { - "type": "child", - "fingerprint": "fp1", - "gh_alert_numbers": '["100"]', - "occurrence_count": "1", - "last_occurrence_fp": "same_occ", - "first_seen": "2026-01-01", - "last_seen": "2026-01-01", - }) - ctx = _make_alert_context(alert_number=100, fingerprint="fp1", occurrence_fp="same_occ") - secmeta, new_occurrence = _merge_child_secmeta(ctx=ctx, issue=child) - assert new_occurrence is False - assert secmeta["occurrence_count"] == "1" - -def test_merge_date_range_expansion() -> None: - """first_seen takes the min, last_seen takes the max.""" - child = _issue_with_secmeta(1, { - "type": "child", - "fingerprint": "fp1", - "first_seen": "2026-02-01", - "last_seen": "2026-02-15", - "occurrence_count": "1", - }) - ctx = _make_alert_context(first_seen="2026-01-15", last_seen="2026-03-01") - secmeta, _ = _merge_child_secmeta(ctx=ctx, issue=child) - assert secmeta["first_seen"] == "2026-01-15" - assert secmeta["last_seen"] == "2026-03-01" def test_merge_removes_alert_hash() -> None: """Legacy alert_hash key is dropped during merge.""" @@ -350,26 +327,11 @@ def test_merge_removes_alert_hash() -> None: "type": "child", "alert_hash": "old_hash", "fingerprint": "fp1", - "occurrence_count": "1", - "first_seen": "2026-01-01", - "last_seen": "2026-01-01", }) ctx = _make_alert_context(fingerprint="fp1") - secmeta, _ = _merge_child_secmeta(ctx=ctx, issue=child) + secmeta = _merge_child_secmeta(ctx=ctx, issue=child) assert "alert_hash" not in secmeta -def test_merge_zero_occurrence_count_reset() -> None: - """occurrence_count <= 0 is reset to at least 1.""" - child_secmeta_str = render_secmeta({ - "type": "child", "fingerprint": "fp1", - "occurrence_count": "0", "last_occurrence_fp": "same_fp", - "first_seen": "2026-01-01", "last_seen": "2026-01-01", - }) - child = Issue(number=1, state="open", title="T", body=child_secmeta_str + "\nBody\n") - ctx = _make_alert_context(fingerprint="fp1", occurrence_fp="same_fp") - secmeta, _ = _merge_child_secmeta(ctx=ctx, issue=child) - assert int(secmeta["occurrence_count"]) >= 1 - # ===================================================================== # _maybe_reopen_child @@ -383,6 +345,7 @@ def test_reopen_child_open_issue() -> None: sync = _make_sync_context() assert _maybe_reopen_child(ctx=ctx, sync=sync, issue=issue, parent_issue=None) is False + def test_reopen_child_dry_run() -> None: """Dry-run marks reopened=True and appends notification.""" body = render_secmeta({"type": "child", "category": "sast"}) + "\nbody" @@ -395,6 +358,7 @@ def test_reopen_child_dry_run() -> None: assert len(notifications) == 1 assert notifications[0].state == "reopen" + def test_reopen_child_real(mocker: MockerFixture) -> None: """Non-dry-run calls gh_issue_edit_state and appends notification.""" mock_edit = mocker.patch("security.issues.sync.gh_issue_edit_state", return_value=True) @@ -408,6 +372,7 @@ def test_reopen_child_real(mocker: MockerFixture) -> None: assert len(notifications) == 1 mock_edit.assert_called_once() + def test_reopen_child_cascades_to_parent(mocker: MockerFixture) -> None: """Reopening child also reopens the closed parent (dry-run).""" mocker.patch("security.issues.sync.gh_issue_edit_state", return_value=True) @@ -435,6 +400,7 @@ def test_rebuild_body_changed(mocker: MockerFixture, sast_alert: Alert) -> None: _rebuild_and_apply_child_body(ctx=ctx, sync=sync, issue=issue, secmeta=secmeta) mock_edit.assert_called_once() + def test_rebuild_body_unchanged(sast_alert: Alert) -> None: """When body is identical, no API call is made.""" from security.issues.builder import build_child_issue_body @@ -447,6 +413,7 @@ def test_rebuild_body_unchanged(sast_alert: Alert) -> None: sync = _make_sync_context() _rebuild_and_apply_child_body(ctx=ctx, sync=sync, issue=issue, secmeta=secmeta) + def test_rebuild_body_dry_run(sast_alert: Alert) -> None: """In dry-run mode, body is not written via API.""" issue = Issue(number=1, state="open", title="T", body="old body") @@ -472,17 +439,19 @@ def test_sync_title_drift_corrected(mocker: MockerFixture) -> None: mock_title.assert_called_once() mock_labels.assert_called_once() + def test_sync_title_already_correct(mocker: MockerFixture) -> None: """Title is not updated when it matches the expected format.""" mock_labels = mocker.patch("security.issues.sync.gh_issue_add_labels") from security.issues.builder import build_issue_title - title = build_issue_title("sast", "CVE-2026-1234", "fp_test_123") + title = build_issue_title("Test finding description", "sast", "CVE-2026-1234", "fp_test_123") issue = Issue(number=1, state="open", title=title, body="b") ctx = _make_alert_context(rule_name="sast", rule_id="CVE-2026-1234", fingerprint="fp_test_123") sync = _make_sync_context() _sync_child_title_and_labels(ctx=ctx, sync=sync, issue=issue) mock_labels.assert_called_once() + def test_sync_title_dry_run() -> None: """Dry-run mode logs instead of calling gh.""" issue = Issue(number=1, state="open", title="Wrong", body="b") @@ -499,7 +468,6 @@ def test_sync_title_dry_run() -> None: def test_handle_new_child_creates_issue(mocker: MockerFixture, sast_alert: Alert) -> None: """Creates a new issue and registers it in the index.""" mock_create = mocker.patch("security.issues.sync.gh_issue_create", return_value=42) - mocker.patch("security.issues.sync.gh_issue_comment") ctx = _make_alert_context(alert=sast_alert, rule_name="sast") issues: dict[int, Issue] = {} index = IssueIndex(by_fingerprint={}, parent_by_rule_id={}) @@ -511,6 +479,7 @@ def test_handle_new_child_creates_issue(mocker: MockerFixture, sast_alert: Alert assert len(sync.notifications) == 1 assert sync.notifications[0].state == "new" + def test_handle_new_child_dry_run(sast_alert: Alert) -> None: """Dry-run does not call gh_issue_create but records notification.""" ctx = _make_alert_context(alert=sast_alert) @@ -520,6 +489,7 @@ def test_handle_new_child_dry_run(sast_alert: Alert) -> None: assert len(notifications) == 1 assert notifications[0].issue_number == 0 + def test_handle_new_child_links_to_parent(mocker: MockerFixture, sast_alert: Alert) -> None: """When a parent issue exists, the child is linked as a sub-issue.""" mocker.patch("security.issues.sync.gh_issue_create", return_value=42) @@ -530,6 +500,7 @@ def test_handle_new_child_links_to_parent(mocker: MockerFixture, sast_alert: Ale _handle_new_child_issue(ctx=ctx, sync=sync, parent_issue=parent) mock_sub.assert_called_once_with("test-org/test-repo", 1, 42) + def test_handle_new_child_create_fails(mocker: MockerFixture, sast_alert: Alert) -> None: """If gh_issue_create returns None, no crash and no index update.""" mocker.patch("security.issues.sync.gh_issue_create", return_value=None) @@ -623,6 +594,7 @@ def test_ensure_parent_creates_new(mocker: MockerFixture, sast_alert: Alert) -> mock_create.assert_called_once() assert sast_alert.metadata.rule_id in index.parent_by_rule_id + def test_ensure_parent_dry_run(sast_alert: Alert) -> None: """Dry-run does not create an issue, returns None.""" issues: dict[int, Issue] = {} @@ -630,14 +602,13 @@ def test_ensure_parent_dry_run(sast_alert: Alert) -> None: result = ensure_parent_issue(sast_alert, issues, index, dry_run=True) assert result is None + def test_ensure_parent_existing_returns_existing(sast_alert: Alert) -> None: """Returns the existing parent issue if one already exists.""" parent = _issue_with_secmeta(10, { "type": "parent", "rule_id": sast_alert.metadata.rule_id, "severity": "high", - "first_seen": "2026-01-01", - "last_seen": "2026-01-01", }) issues = {10: parent} index = build_issue_index(issues) @@ -645,14 +616,13 @@ def test_ensure_parent_existing_returns_existing(sast_alert: Alert) -> None: assert result is not None assert result.number == 10 + def test_ensure_parent_severity_change_detected(sast_alert: Alert) -> None: """Severity change is detected and recorded.""" parent = _issue_with_secmeta(10, { "type": "parent", "rule_id": sast_alert.metadata.rule_id, "severity": "low", - "first_seen": "2026-01-01", - "last_seen": "2026-01-01", }) issues = {10: parent} index = build_issue_index(issues) @@ -662,6 +632,7 @@ def test_ensure_parent_severity_change_detected(sast_alert: Alert) -> None: assert changes[0].old_severity == "low" assert changes[0].new_severity == "high" + def test_ensure_parent_no_rule_id() -> None: """Returns None when alert has no rule_id.""" alert = Alert.from_dict({"metadata": {"rule_id": ""}, "alert_details": {}, "rule_details": {}}) @@ -669,6 +640,7 @@ def test_ensure_parent_no_rule_id() -> None: index = IssueIndex(by_fingerprint={}, parent_by_rule_id={}) assert ensure_parent_issue(alert, issues, index, dry_run=False) is None + def test_ensure_parent_create_fails(mocker: MockerFixture, sast_alert: Alert) -> None: """Returns None if gh_issue_create fails.""" mocker.patch("security.issues.sync.gh_issue_create", return_value=None) @@ -677,14 +649,13 @@ def test_ensure_parent_create_fails(mocker: MockerFixture, sast_alert: Alert) -> result = ensure_parent_issue(sast_alert, issues, index, dry_run=False) assert result is None + def test_ensure_parent_body_deferred(sast_alert: Alert) -> None: """Parent body update is deferred (snapshot captured in parent_original_bodies).""" parent = _issue_with_secmeta(10, { "type": "parent", "rule_id": sast_alert.metadata.rule_id, "severity": "high", - "first_seen": "2026-01-01", - "last_seen": "2026-01-01", }) original_body = parent.body issues = {10: parent} @@ -694,6 +665,7 @@ def test_ensure_parent_body_deferred(sast_alert: Alert) -> None: assert 10 in bods assert bods[10][1] == original_body + def test_ensure_parent_title_drift_corrected(mocker: MockerFixture, sast_alert: Alert) -> None: """Title is updated when it drifts from the expected format.""" mock_title = mocker.patch("security.issues.sync.gh_issue_edit_title", return_value=True) @@ -701,8 +673,6 @@ def test_ensure_parent_title_drift_corrected(mocker: MockerFixture, sast_alert: "type": "parent", "rule_id": sast_alert.metadata.rule_id, "severity": "high", - "first_seen": "2026-01-01", - "last_seen": "2026-01-01", }) parent.title = "Wrong old title" issues = {10: parent} @@ -724,18 +694,21 @@ def test_flush_writes_changed_bodies(mocker: MockerFixture) -> None: _flush_parent_body_updates(bods, {1: issue}, dry_run=False) mock_edit.assert_called_once_with("org/repo", 1, "new body") + def test_flush_skips_unchanged() -> None: """Does not call API if body is unchanged.""" issue = Issue(number=1, state="open", title="T", body="same body") bods = {1: ("org/repo", "same body")} _flush_parent_body_updates(bods, {1: issue}, dry_run=False) + def test_flush_dry_run() -> None: """Dry-run logs instead of calling API.""" issue = Issue(number=1, state="open", title="T", body="new body") bods = {1: ("org/repo", "old body")} _flush_parent_body_updates(bods, {1: issue}, dry_run=True) + def test_flush_missing_issue() -> None: """Skips silently if issue is no longer in the dict.""" bods = {99: ("org/repo", "old body")} @@ -752,10 +725,12 @@ def test_label_orphan_no_orphans() -> None: child = _issue_with_secmeta(1, {"type": "child", "fingerprint": "fp1"}) index = build_issue_index({1: child}) alerts: dict[int, Alert] = { - 100: Alert.from_dict({"metadata": {"state": "open"}, "alert_details": {"alert_hash": "fp1"}, "rule_details": {}}), + 100: Alert.from_dict( + {"metadata": {"state": "open"}, "alert_details": {"alert_hash": "fp1"}, "rule_details": {}}), } _label_orphan_issues(alerts, index, dry_run=False) + def test_label_orphan_found(mocker: MockerFixture) -> None: """Labels child issues that have no matching alert.""" mock_labels = mocker.patch("security.issues.sync.gh_issue_add_labels") @@ -769,6 +744,7 @@ def test_label_orphan_found(mocker: MockerFixture) -> None: label_args = mock_labels.call_args[0][2] assert "sec:adept-to-close" in label_args + def test_label_orphan_dry_run() -> None: """Dry-run: logs but does not call gh.""" child = _issue_with_secmeta(1, { @@ -777,6 +753,7 @@ def test_label_orphan_dry_run() -> None: index = build_issue_index({1: child}) _label_orphan_issues({}, index, dry_run=True) + def test_label_orphan_skips_already_labelled() -> None: """Skips if adept-to-close label is already present.""" child = _issue_with_secmeta(1, { @@ -786,6 +763,7 @@ def test_label_orphan_skips_already_labelled() -> None: index = build_issue_index({1: child}) _label_orphan_issues({}, index, dry_run=False) + def test_label_orphan_skips_closed_issues() -> None: """Closed child issues are not labelled as orphans.""" child = _issue_with_secmeta(1, { @@ -794,6 +772,7 @@ def test_label_orphan_skips_closed_issues() -> None: index = build_issue_index({1: child}) _label_orphan_issues({}, index, dry_run=False) + def test_label_orphan_no_repo_in_secmeta() -> None: """Skips labelling if no repo in secmeta.""" child = _issue_with_secmeta(1, { @@ -861,6 +840,7 @@ def test_ensure_issue_dry_run(sast_alert: Alert) -> None: assert len(notifications) == 1 assert notifications[0].issue_number == 0 + def test_ensure_issue_skips_non_open() -> None: """Alerts with state != 'open' are skipped.""" alert = Alert.from_dict({ @@ -872,6 +852,7 @@ def test_ensure_issue_skips_non_open() -> None: index = IssueIndex(by_fingerprint={}, parent_by_rule_id={}) ensure_issue(alert, _make_sync_context(issues=issues, index=index, dry_run=True)) + def test_ensure_issue_missing_alert_hash_raises() -> None: """Raises SystemExit when alert hash is missing.""" alert = Alert.from_dict({ @@ -884,6 +865,7 @@ def test_ensure_issue_missing_alert_hash_raises() -> None: with pytest.raises(SystemExit, match="alert_hash"): ensure_issue(alert, _make_sync_context(issues=issues, index=index, dry_run=True)) + def test_ensure_issue_missing_alert_details_raises() -> None: """Raises SystemExit when alert_details has no alert_hash.""" alert = Alert.from_dict({ @@ -908,17 +890,19 @@ def test_sync_empty() -> None: assert result.notifications == [] assert result.severity_changes == [] + def test_sync_dry_run_single_alert(sast_alert: Alert) -> None: """Single alert dry-run produces a notification.""" alerts = {303: sast_alert} result = sync_alerts_and_issues(alerts, {}, dry_run=True) assert len(result.notifications) == 1 + def test_sync_severity_change_detected(sast_alert: Alert) -> None: """Severity change on existing parent is captured in result.""" parent = _issue_with_secmeta(10, { "type": "parent", "rule_id": sast_alert.metadata.rule_id, - "severity": "low", "first_seen": "2026-01-01", "last_seen": "2026-01-01", + "severity": "low", }) issues = {10: parent} result = sync_alerts_and_issues({303: sast_alert}, issues, dry_run=True) @@ -957,6 +941,7 @@ def test_init_priority_sync_no_map() -> None: ) assert result is None + def test_init_priority_sync_no_project_number() -> None: """Returns None when project_number is falsy (0 or None).""" result = _init_priority_sync( @@ -964,6 +949,7 @@ def test_init_priority_sync_no_project_number() -> None: ) assert result is None + def test_init_priority_sync_no_org_returns_none() -> None: """Returns None with warning when org cannot be determined.""" alerts = {1: Alert.from_dict({"_repo": ""})} @@ -973,6 +959,7 @@ def test_init_priority_sync_no_org_returns_none() -> None: ) assert result is None + def test_init_priority_sync_field_lookup_fails(mocker: MockerFixture) -> None: """Returns None when gh_project_get_priority_field fails.""" mocker.patch("security.issues.sync.gh_project_get_priority_field", return_value=None) diff --git a/tests/security/issues/test_templates.py b/tests/security/issues/test_templates.py index 97265d2..9399980 100644 --- a/tests/security/issues/test_templates.py +++ b/tests/security/issues/test_templates.py @@ -102,7 +102,7 @@ def test_child_contains_all_placeholders() -> None: "{{ avd_id }}", "{{ alert_hash }}", "{{ title }}", "{{ message }}", "{{ repository_full_name }}", "{{ file_display }}", "{{ file_permalink }}", "{{ package_name }}", "{{ installed_version }}", "{{ fixed_version }}", - "{{ reachable }}", "{{ scan_date }}", "{{ first_seen }}", + "{{ reachable }}", "{{ first_seen }}", ] for ph in expected_placeholders: assert ph in CHILD_BODY_TEMPLATE, f"Missing placeholder: {ph}" @@ -120,7 +120,6 @@ def test_child_renders_without_error() -> None: "installed_version": "1.0", "fixed_version": "2.0", "reachable": "True", - "scan_date": "2026-01-01", "first_seen": "2026-01-01", } result = render_markdown_template(CHILD_BODY_TEMPLATE, values) diff --git a/tests/security/test_collect_alert.py b/tests/security/test_collect_alert.py index 57fce59..bc701b3 100644 --- a/tests/security/test_collect_alert.py +++ b/tests/security/test_collect_alert.py @@ -49,6 +49,7 @@ "rule": { "id": "rule-1", "name": "sast", + "description": "Requests with verify=False", "security_severity_level": "high", "severity": "error", "tags": ["HIGH", "sast"], @@ -263,6 +264,7 @@ def test_normalise_alert_metadata() -> None: assert meta["state"] == "open" assert meta["rule_id"] == "rule-1" assert meta["rule_name"] == "sast" + assert meta["rule_description"] == "Requests with verify=False" assert meta["severity"] == "high" assert meta["confidence"] == "error" assert meta["tags"] == ["HIGH", "sast"] @@ -298,6 +300,7 @@ def test_normalise_alert_minimal() -> None: assert meta["alert_number"] is None assert meta["state"] is None assert meta["rule_id"] is None + assert meta["rule_description"] is None assert meta["tool"] is None assert meta["file"] is None assert meta["tags"] == [] diff --git a/tests/security/test_constants.py b/tests/security/test_constants.py index e189468..91560d2 100644 --- a/tests/security/test_constants.py +++ b/tests/security/test_constants.py @@ -39,13 +39,6 @@ def test_adept_to_close() -> None: assert LABEL_SEC_ADEPT_TO_CLOSE == "sec:adept-to-close" -def test_open() -> None: - assert SEC_EVENT_OPEN == "open" - -def test_reopen() -> None: - assert SEC_EVENT_REOPEN == "reopen" - - def test_parent() -> None: assert SECMETA_TYPE_PARENT == "parent" From 36f81210b54237bc4e671acf8f96bdd36d7bbf18 Mon Sep 17 00:00:00 2001 From: "Tobias.Mikula" Date: Wed, 22 Apr 2026 14:08:41 +0200 Subject: [PATCH 05/13] Adding test config for test purposes. --- .github/workflows/aquasec-scan.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/aquasec-scan.yml b/.github/workflows/aquasec-scan.yml index a773f77..5e7ca0a 100644 --- a/.github/workflows/aquasec-scan.yml +++ b/.github/workflows/aquasec-scan.yml @@ -109,7 +109,7 @@ jobs: uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: repository: AbsaOSS/organizational-workflows - ref: master + ref: feature/remove-obsolete-parts path: org-workflows persist-credentials: false From 7e00a24ecc1ba974766367e22360a08fde377468 Mon Sep 17 00:00:00 2001 From: "Tobias.Mikula" Date: Wed, 22 Apr 2026 14:23:42 +0200 Subject: [PATCH 06/13] Removing test config --- .github/workflows/aquasec-scan.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/aquasec-scan.yml b/.github/workflows/aquasec-scan.yml index 5e7ca0a..a773f77 100644 --- a/.github/workflows/aquasec-scan.yml +++ b/.github/workflows/aquasec-scan.yml @@ -109,7 +109,7 @@ jobs: uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd with: repository: AbsaOSS/organizational-workflows - ref: feature/remove-obsolete-parts + ref: master path: org-workflows persist-credentials: false From 1bf5928131dc3465f7f18ce50bfefe33d950cbdc Mon Sep 17 00:00:00 2001 From: "Tobias.Mikula" Date: Wed, 22 Apr 2026 15:36:05 +0200 Subject: [PATCH 07/13] Detecting the format symbols in description. --- .github/copilot-instructions.md | 2 +- src/core/helpers.py | 24 +- src/security/issues/builder.py | 10 +- tests/core/test_helpers.py | 73 +++++ tests/security/issues/test_builder.py | 438 ++++++++++---------------- 5 files changed, 272 insertions(+), 275 deletions(-) create mode 100644 tests/core/test_helpers.py diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 8b5c60e..f89c722 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -41,7 +41,7 @@ Python style - F-strings in exceptions: `raise ValueError(f"Error {var}")` - Google-style docstrings - Single blank line at end of file -- No documentation for `__init__` methods +- No documentation for `__init__` methods and test modules Patterns - Classes with `__init__` cannot throw exceptions diff --git a/src/core/helpers.py b/src/core/helpers.py index af42f6e..ea1d4a6 100644 --- a/src/core/helpers.py +++ b/src/core/helpers.py @@ -14,12 +14,21 @@ # limitations under the License. # -"""Pure utility functions – date helpers, hashing, and path normalisation.""" +"""Pure utility functions""" import hashlib import re from datetime import datetime, timezone +# Matches lines that start with 1-6 '#' followed by a space +_HEADING_RE = re.compile(r"^(#{1,6}\s)", re.MULTILINE) +# Matches '>' at the start of a line (blockquote) +_BLOCKQUOTE_RE = re.compile(r"^(>)", re.MULTILINE) +# Matches horizontal rules: three or more -, *, or _ on a line by themselves +_HR_RE = re.compile(r"^([-*_]{3,})\s*$", re.MULTILINE) +# Matches '|' at the start of a line (table row). +_TABLE_RE = re.compile(r"^(\|)", re.MULTILINE) + def utc_today() -> str: """Return today's date in UTC as an ISO-8601 string (``YYYY-MM-DD``).""" @@ -50,3 +59,16 @@ def normalize_path(path: str | None) -> str: p = p.lstrip("/") p = re.sub(r"/+", "/", p) return p + + +def sanitize_markdown(text: str) -> str: + """Escape block-level Markdown so text renders as plain content in an issue body.""" + if not text: + return text + + sanitized = _HEADING_RE.sub(r"\\\1", text) + sanitized = _BLOCKQUOTE_RE.sub(r"\\\1", sanitized) + sanitized = _HR_RE.sub(r"\\\1", sanitized) + sanitized = _TABLE_RE.sub(r"\\\1", sanitized) + + return sanitized diff --git a/src/security/issues/builder.py b/src/security/issues/builder.py index fed75c9..3e7cbac 100644 --- a/src/security/issues/builder.py +++ b/src/security/issues/builder.py @@ -18,7 +18,7 @@ from typing import Any -from core.helpers import iso_date +from core.helpers import iso_date, sanitize_markdown from core.rendering import render_markdown_template from security.constants import NOT_AVAILABLE, SECMETA_TYPE_PARENT @@ -60,7 +60,7 @@ def alert_extra_data(alert: Alert) -> dict[str, Any]: "impact": alert.rule_details.impact, "likelihood": alert.rule_details.likelihood, "confidence": alert.rule_details.confidence, - "remediation": alert.rule_details.remediation, + "remediation": sanitize_markdown(alert.rule_details.remediation), "references": references, } @@ -87,7 +87,7 @@ def build_parent_template_values(alert: Alert, *, rule_id: str, severity: str) - return { "category": alert.metadata.rule_name or NOT_AVAILABLE, "avd_id": alert.alert_details.vulnerability or rule_id, - "title": alert.metadata.rule_description or rule_id, + "title": sanitize_markdown(alert.metadata.rule_description or rule_id), "severity": severity, "published_date": iso_date(alert.rule_details.published_date or NOT_AVAILABLE), "package_name": alert.rule_details.package_name, @@ -135,7 +135,7 @@ def build_child_issue_body(alert: Alert) -> str: vulnerability = alert.alert_details.vulnerability avd_id = vulnerability if vulnerability.startswith("AVD-") else NOT_AVAILABLE - title = alert.metadata.rule_description or alert.metadata.rule_id + title = sanitize_markdown(alert.metadata.rule_description or alert.metadata.rule_id) scm_file = alert.alert_details.scm_file start_line = alert.metadata.start_line @@ -151,7 +151,7 @@ def build_child_issue_body(alert: Alert) -> str: file_display = file_name or NOT_AVAILABLE alert_hash = alert.alert_details.alert_hash - message = alert.alert_details.message + message = sanitize_markdown(alert.alert_details.message) category = classify_category(alert) diff --git a/tests/core/test_helpers.py b/tests/core/test_helpers.py new file mode 100644 index 0000000..0ae35a7 --- /dev/null +++ b/tests/core/test_helpers.py @@ -0,0 +1,73 @@ +# +# Copyright 2026 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pytest + +from core.helpers import sanitize_markdown + + +# sanitize_markdown + + +@pytest.mark.parametrize( + "text", + [None, "", "Normal text without markdown", "text with numbers 123"], + ids=["none", "empty", "plain", "numbers"], +) +def test_passthrough_unchanged(text: str | None) -> None: + assert sanitize_markdown(text) == text + + +@pytest.mark.parametrize( + "raw, expected", + [ + ("# Heading one", r"\# Heading one"), + ("## Heading two", r"\## Heading two"), + ("### Deep heading", r"\### Deep heading"), + ("> quoted text", r"\> quoted text"), + ("| col1 | col2 |", r"\| col1 | col2 |"), + ("---", r"\---"), + ], + ids=["h1", "h2", "h3", "blockquote", "table", "hr"], +) +def test_block_markdown_escaped(raw: str, expected: str) -> None: + assert expected == sanitize_markdown(raw) + + +@pytest.mark.parametrize( + "text", + ["some **bold** text", "some __bold__ text", "use `code` here"], + ids=["bold", "underscore-bold", "backtick"], +) +def test_inline_markdown_preserved(text: str) -> None: + assert text == sanitize_markdown(text) + + +def test_heading_in_multiline() -> None: + text = "First line\n## Second is heading\nThird line" + result = sanitize_markdown(text) + assert r"\## Second is heading" in result + assert result.startswith("First line\n") + + +def test_real_aquasec_message() -> None: + msg = ( + "## Black is the uncompromising Python code formatter. " + "Prior to 26.3.1, Black writes a cache file." + ) + result = sanitize_markdown(msg) + assert not result.startswith("## ") + assert result.startswith(r"\## ") diff --git a/tests/security/issues/test_builder.py b/tests/security/issues/test_builder.py index c350ab9..7bbcd56 100644 --- a/tests/security/issues/test_builder.py +++ b/tests/security/issues/test_builder.py @@ -14,8 +14,6 @@ # limitations under the License. # -"""Unit tests for ``utils.issue_builder``.""" - import pytest from security.issues.builder import ( @@ -31,17 +29,14 @@ # ===================================================================== -# Low-level helpers +# alert_extra_data # ===================================================================== def test_extra_data_from_nested() -> None: """Extra data is synthesised from nested metadata + rule_details.""" alert = Alert.from_dict({ - "metadata": { - "rule_id": "CVE-123", - "rule_name": "sast", - }, + "metadata": {"rule_id": "CVE-123", "rule_name": "sast"}, "rule_details": { "confidence": "error", "owasp": "https://owasp.org/Top10/A07", @@ -57,340 +52,234 @@ def test_extra_data_from_nested() -> None: assert extra["confidence"] == "error" assert extra["category"] == "sast" + def test_extra_data_non_cve() -> None: """Non-CVE rule_id results in N/A for cve field.""" - alert = Alert.from_dict({ - "metadata": {"rule_id": "RULE-1", "rule_name": "sast"}, - "rule_details": {}, - }) - extra = alert_extra_data(alert) - assert extra["cve"] == "N/A" + alert = Alert.from_dict({"metadata": {"rule_id": "RULE-1", "rule_name": "sast"}, "rule_details": {}}) + assert alert_extra_data(alert)["cve"] == "N/A" -def test_sast(sast_alert: Alert) -> None: - assert classify_category(sast_alert) == "sast" +# ===================================================================== +# classify_category +# ===================================================================== -def test_vuln(vuln_alert: Alert) -> None: - assert classify_category(vuln_alert) == "vulnerabilities" -def test_empty() -> None: - assert classify_category(Alert()) == "" +@pytest.mark.parametrize("raw, expected", [ + ({"metadata": {"rule_name": "sast"}}, "sast"), + ({"metadata": {"rule_name": "vulnerabilities"}}, "vulnerabilities"), + ({}, ""), +], ids=["sast", "vulnerabilities", "empty"]) +def test_classify_category(raw: dict, expected: str) -> None: + assert expected == classify_category(Alert.from_dict(raw)) # ===================================================================== -# Parent issue builders +# build_parent_issue_title # ===================================================================== -def test_with_severity() -> None: - assert build_parent_issue_title("CVE-2026-25755", "high") == ( - "[HIGH] Security Alert \u2013 CVE-2026-25755" - ) +@pytest.mark.parametrize("severity, expected", [ + ("high", "[HIGH] Security Alert \u2013 CVE-2026-25755"), + ("", "Security Alert \u2013 CVE-2026-25755"), +], ids=["with_severity", "without_severity"]) +def test_build_parent_issue_title(severity: str, expected: str) -> None: + assert expected == build_parent_issue_title("CVE-2026-25755", severity) + -def test_without_severity() -> None: - assert build_parent_issue_title("CVE-2026-25755") == ( - "Security Alert \u2013 CVE-2026-25755" - ) +# ===================================================================== +# build_parent_template_values +# ===================================================================== def test_sast_category_from_rule_name(sast_alert: Alert) -> None: - vals = build_parent_template_values( - sast_alert, rule_id="req-with-very-false-aquasec-python", severity="high" - ) + vals = build_parent_template_values(sast_alert, rule_id="req-with-very-false-aquasec-python", severity="high") assert vals["category"] == "sast" + def test_vuln_category_from_rule_name(vuln_alert: Alert) -> None: - vals = build_parent_template_values( - vuln_alert, rule_id="CVE-2026-25755", severity="high" - ) + vals = build_parent_template_values(vuln_alert, rule_id="CVE-2026-25755", severity="high") assert vals["category"] == "vulnerabilities" + def test_avd_id_uses_vulnerability(sast_alert: Alert) -> None: - vals = build_parent_template_values( - sast_alert, rule_id="req-with-very-false-aquasec-python", severity="high" - ) + vals = build_parent_template_values(sast_alert, rule_id="req-with-very-false-aquasec-python", severity="high") assert vals["avd_id"] == "req-with-very-false-aquasec-python" -def test_published_date_from_rule_details(sast_alert: Alert) -> None: - """Published date comes from rule_details.published_date.""" - vals = build_parent_template_values( - sast_alert, rule_id="test", severity="high" - ) - # SAST fixture has rule_details.published_date = None; absent dates map to N/A - assert vals["published_date"] == "N/A" - - -def test_published_date_none_in_raw_dict() -> None: - """published_date=None in the raw dict (as returned by _parse_rule_details for a - missing field) must yield N/A, not today's date via iso_date(None). - """ - raw: dict = { - "metadata": {"rule_id": "x", "severity": "low"}, - "rule_details": {"published_date": None}, - } - alert = Alert.from_dict(raw) - vals = build_parent_template_values(alert, rule_id="x", severity="low") - assert vals["published_date"] == "N/A" - - -def test_published_date_absent_from_raw_dict() -> None: - """published_date key absent entirely from rule_details must also yield N/A.""" - raw: dict = { - "metadata": {"rule_id": "x", "severity": "low"}, - "rule_details": {}, - } - alert = Alert.from_dict(raw) - vals = build_parent_template_values(alert, rule_id="x", severity="low") - assert vals["published_date"] == "N/A" + +@pytest.mark.parametrize("rule_details", [ + {"published_date": None}, + {}, +], ids=["published_date_none", "published_date_absent"]) +def test_published_date_yields_na(rule_details: dict) -> None: + alert = Alert.from_dict({"metadata": {"rule_id": "x", "severity": "low"}, "rule_details": rule_details}) + assert "N/A" == build_parent_template_values(alert, rule_id="x", severity="low")["published_date"] def test_extra_data_synthesised(sast_alert: Alert) -> None: - """Fields are synthesised from the nested alert structure.""" - vals = build_parent_template_values( - sast_alert, rule_id="test", severity="high" - ) - extra = vals["extraData"] + extra = build_parent_template_values(sast_alert, rule_id="test", severity="high")["extraData"] assert isinstance(extra, dict) assert extra["confidence"] == "error" assert extra["category"] == "sast" - # OWASP reference derived from rule_details.owasp assert "owasp" in extra["owasp"].lower() + def test_extra_data_references(vuln_alert: Alert) -> None: - vals = build_parent_template_values( - vuln_alert, rule_id="CVE-2026-25755", severity="high" - ) - refs = vals["extraData"]["references"] + refs = build_parent_template_values(vuln_alert, rule_id="CVE-2026-25755", severity="high")["extraData"]["references"] assert "redhat.com" in refs -def test_references_fallback_to_metadata_urls() -> None: - """When rule_details.references is absent, fall back to metadata.help_uri / alert_url.""" +def test_all_template_keys_present(sast_alert: Alert) -> None: + vals = build_parent_template_values(sast_alert, rule_id="test", severity="high") + required = {"category", "avd_id", "title", "severity", "published_date", "package_name", "fixed_version", "extraData"} + assert required.issubset(vals.keys()) + + +def test_extra_data_sub_keys(sast_alert: Alert) -> None: + extra = build_parent_template_values(sast_alert, rule_id="test", severity="high")["extraData"] + required_extra = {"cve", "owasp", "category", "impact", "likelihood", "confidence", "remediation", "references"} + assert required_extra.issubset(extra.keys()) + + +# ===================================================================== +# References fallback (_synthesize_references) +# ===================================================================== + + +def test_references_fallback_to_both_urls() -> None: + """When rule_details.references is absent, both help_uri and alert_url are included.""" alert = Alert.from_dict({ "metadata": { "rule_id": "RULE-1", "help_uri": "https://example.com/rule", "alert_url": "https://github.com/org/repo/security/code-scanning/1", }, - "rule_details": {}, # references will be normalised to N/A - }) - extra = alert_extra_data(alert) - assert "https://example.com/rule" in extra["references"] - assert "https://github.com/org/repo/security/code-scanning/1" in extra["references"] - - -def test_references_fallback_help_uri_only() -> None: - """Fallback uses only help_uri when alert_url is absent.""" - alert = Alert.from_dict({ - "metadata": { - "rule_id": "RULE-2", - "help_uri": "https://docs.example.com/cve", - }, "rule_details": {}, }) - extra = alert_extra_data(alert) - assert "https://docs.example.com/cve" in extra["references"] + refs = alert_extra_data(alert)["references"] + assert "https://example.com/rule" in refs + assert "https://github.com/org/repo/security/code-scanning/1" in refs -def test_references_no_fallback_urls_yields_na() -> None: - """Fallback returns N/A when metadata has no useful URLs either.""" - alert = Alert.from_dict({ - "metadata": {"rule_id": "RULE-3"}, - "rule_details": {}, - }) - extra = alert_extra_data(alert) - assert extra["references"] == "N/A" +@pytest.mark.parametrize("metadata, expected", [ + ({"rule_id": "RULE-2", "help_uri": "https://docs.example.com/cve"}, "https://docs.example.com/cve"), + ({"rule_id": "RULE-3"}, "N/A"), +], ids=["help_uri_only", "no_urls_yields_na"]) +def test_references_fallback(metadata: dict, expected: str) -> None: + alert = Alert.from_dict({"metadata": metadata, "rule_details": {}}) + assert expected in alert_extra_data(alert)["references"] def test_references_not_overridden_when_present() -> None: - """rule_details.references is used as-is when it contains a value.""" + """rule_details.references is used as-is; metadata URLs are ignored.""" alert = Alert.from_dict({ - "metadata": { - "rule_id": "RULE-4", - "help_uri": "https://should-not-appear.example.com", - }, + "metadata": {"rule_id": "RULE-4", "help_uri": "https://should-not-appear.example.com"}, "rule_details": {"references": "- https://explicit-ref.example.com"}, }) - extra = alert_extra_data(alert) - assert "https://explicit-ref.example.com" in extra["references"] - assert "should-not-appear" not in extra["references"] - -def test_all_template_keys_present(sast_alert: Alert) -> None: - """Every placeholder in PARENT_BODY_TEMPLATE has a corresponding value.""" - vals = build_parent_template_values( - sast_alert, rule_id="test", severity="high" - ) - required = { - "category", "avd_id", "title", "severity", - "published_date", "package_name", - "fixed_version", "extraData", - } - assert required.issubset(vals.keys()) - -def test_extra_data_sub_keys(sast_alert: Alert) -> None: - """All extraData keys referenced in the template are present.""" - vals = build_parent_template_values( - sast_alert, rule_id="test", severity="high" - ) - extra = vals["extraData"] - required_extra = { - "cve", "owasp", "category", "impact", - "likelihood", "confidence", "remediation", "references", - } - assert required_extra.issubset(extra.keys()) + refs = alert_extra_data(alert)["references"] + assert "https://explicit-ref.example.com" in refs + assert "should-not-appear" not in refs # ===================================================================== -# Parent issue body (full render) +# build_parent_issue_body # ===================================================================== -def test_contains_secmeta_block(sast_alert: Alert) -> None: - body = build_parent_issue_body(sast_alert) - assert "