diff --git a/.github/workflows/remove-adept-to-close-on-issue-close.yml b/.github/workflows/remove-adept-to-close-on-issue-close.yml deleted file mode 100644 index e41a30f..0000000 --- a/.github/workflows/remove-adept-to-close-on-issue-close.yml +++ /dev/null @@ -1,73 +0,0 @@ -# -# Copyright 2026 ABSA Group Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Reusable workflow – Remove sec:adept-to-close label when an issue is closed. -# -# Called from application repositories via workflow_call. -# The caller must trigger on `issues: [closed]`. -# Note: for `workflow_call`, the called workflow receives the same event payload as the caller, -# so `context.payload` (aka `github.event`) is populated without needing to "forward" it via inputs. - -name: Remove sec:adept-to-close on close - -on: - workflow_call: - -permissions: - issues: write - -jobs: - cleanup-label: - runs-on: ubuntu-latest - steps: - - name: Remove label when conditions match - uses: actions/github-script@3a2844b7e9c422d3c10d287c895573f7108da1b3 - with: - script: | - const issue = context.payload.issue; - - // Safety: ignore PRs (they can appear as issues in GitHub UI) - if (issue.pull_request) { - core.info('Skipping: payload refers to a pull request, not an issue.'); - return; - } - - const labels = (issue.labels ?? []) - .map(l => (typeof l === 'string' ? l : l?.name)) - .filter(Boolean); - - const hasScopeSecurity = labels.includes('scope:security'); - const hasTechDebt = labels.includes('type:tech-debt'); - const hasAdeptToClose = labels.includes('sec:adept-to-close'); - - if (!hasScopeSecurity || !hasTechDebt) { - core.info( - `Skipping: required labels missing (scope:security=${hasScopeSecurity}, type:tech-debt=${hasTechDebt}).` - ); - return; - } - - if (!hasAdeptToClose) { - core.info('No-op: label sec:adept-to-close is not present on the issue.'); - return; - } - - await github.rest.issues.removeLabel({ - owner: context.repo.owner, - repo: context.repo.repo, - issue_number: issue.number, - name: 'sec:adept-to-close', - }); diff --git a/pyproject.toml b/pyproject.toml index 957e1a1..be4f01c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,11 +8,7 @@ target-version = ['py314'] force-exclude = '''test''' [tool.coverage.run] -omit = [ - "tests/*", - "src/security/derive_team_security_metrics.py", - "src/security/extract_team_security_stats.py", -] +omit = ["tests/*"] [tool.mypy] check_untyped_defs = true diff --git a/src/core/github/client.py b/src/core/github/client.py index 12b39ad..0227f61 100644 --- a/src/core/github/client.py +++ b/src/core/github/client.py @@ -18,29 +18,26 @@ import logging import subprocess -from collections.abc import Mapping def run_cmd( cmd: list[str], *, capture_output: bool = True, - env: Mapping[str, str] | None = None, ) -> subprocess.CompletedProcess: """Run *cmd* as a subprocess and return the completed process.""" - return subprocess.run(cmd, check=False, capture_output=capture_output, text=True, env=env) + return subprocess.run(cmd, check=False, capture_output=capture_output, text=True) def run_gh( args: list[str], *, capture_output: bool = True, - env: Mapping[str, str] | None = None, ) -> subprocess.CompletedProcess: """Run a ``gh`` CLI command and return the completed process.""" cmd = ["gh"] + args try: - return run_cmd(cmd, capture_output=capture_output, env=env) + return run_cmd(cmd, capture_output=capture_output) except FileNotFoundError as exc: logging.error("gh CLI not found. Install and authenticate gh.") raise SystemExit(1) from exc diff --git a/src/core/github/issues.py b/src/core/github/issues.py index cefffc3..64c5a14 100644 --- a/src/core/github/issues.py +++ b/src/core/github/issues.py @@ -280,6 +280,21 @@ def gh_issue_add_labels(repo: str, number: int, labels: list[str]) -> None: logging.warning("Failed to add labels to #%d%s: %s", number, _not_found_hint(res), res.stderr) +def gh_issue_remove_labels(repo: str, number: int, labels: list[str]) -> None: + """Remove *labels* from issue *number* (idempotent).""" + if not labels: + return + + args: list[str] = ["issue", "edit", str(number), "--repo", repo] + + for label in labels: + args += ["--remove-label", label] + + res = run_gh(args) + if res.returncode != 0: + logging.warning("Failed to remove labels from #%d%s: %s", number, _not_found_hint(res), res.stderr) + + def gh_issue_comment(repo: str, number: int, body: str) -> bool: """Post a comment with *body* on issue *number*. diff --git a/src/core/github/projects.py b/src/core/github/projects.py index 7e16f04..6d73c9c 100644 --- a/src/core/github/projects.py +++ b/src/core/github/projects.py @@ -21,8 +21,6 @@ import json import logging -import os -from collections.abc import Mapping from dataclasses import dataclass from typing import Any @@ -52,21 +50,11 @@ class ProjectPriorityField: def _run_graphql(query: str, variables: dict[str, Any] | None = None) -> dict[str, Any] | None: - """Execute a GraphQL query via ``gh api graphql`` and return parsed JSON. - - When ``GH_PROJECT_ONLY_TOKEN`` is set in the environment the GraphQL call is made - with that token instead of the default ``GH_TOKEN``. This allows cross-org - project access while the rest of the pipeline continues to use the scoped - ``github.token``. - """ + """Execute a GraphQL query via ``gh api graphql`` and return parsed JSON.""" args = ["api", "graphql", "-f", f"query={query}"] for k, v in (variables or {}).items(): args += ["-F", f"{k}={v}"] - env: Mapping[str, str] | None = None - project_token = os.environ.get("GH_PROJECT_ONLY_TOKEN", "") - if project_token: - env = {**os.environ, "GH_TOKEN": project_token} - res = run_gh(args, env=env) + res = run_gh(args) if res.returncode != 0: logging.warning(f"GraphQL call failed: {res.stderr}") return None diff --git a/src/security/alerts/models.py b/src/security/alerts/models.py index 9c56c93..b1a074f 100644 --- a/src/security/alerts/models.py +++ b/src/security/alerts/models.py @@ -34,6 +34,7 @@ class AlertMetadata: alert_url: str = "" rule_id: str = "" rule_name: str = "" + rule_description: str = "" severity: str = "" confidence: str = "" tags: list[str] = field(default_factory=list) @@ -51,6 +52,7 @@ class AlertMetadata: def __post_init__(self) -> None: self.rule_id = (self.rule_id or "").strip() self.rule_name = (self.rule_name or "").strip() + self.rule_description = (self.rule_description or "").strip() self.severity = (self.severity or "").strip() or "unknown" self.state = (self.state or "").lower().strip() self.tool = (self.tool or "").strip() diff --git a/src/security/collect_alert.py b/src/security/collect_alert.py index 6590378..110fec3 100644 --- a/src/security/collect_alert.py +++ b/src/security/collect_alert.py @@ -127,6 +127,7 @@ def _normalise_alert(alert: dict) -> dict: "alert_url": alert.get("html_url"), "rule_id": rule.get("id"), "rule_name": rule.get("name"), + "rule_description": rule.get("description"), "severity": rule.get("security_severity_level"), "confidence": rule.get("severity"), "tags": rule.get("tags") or [], diff --git a/src/security/constants.py b/src/security/constants.py index ed32aed..ae85d38 100644 --- a/src/security/constants.py +++ b/src/security/constants.py @@ -21,9 +21,6 @@ LABEL_EPIC = "epic" LABEL_SEC_ADEPT_TO_CLOSE = "sec:adept-to-close" -SEC_EVENT_OPEN = "open" -SEC_EVENT_REOPEN = "reopen" - SECMETA_TYPE_PARENT = "parent" SECMETA_TYPE_CHILD = "child" diff --git a/src/security/derive_team_security_metrics.py b/src/security/derive_team_security_metrics.py deleted file mode 100644 index 8316e66..0000000 --- a/src/security/derive_team_security_metrics.py +++ /dev/null @@ -1,208 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright 2026 ABSA Group Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# pylint: skip-file -""" -Derive team security metrics from issue snapshots (Issues-only). - -Resurfacing definition (B): -- A fingerprint is considered 'resurfaced' when its occurrence_count transitions - from 0 in the previous snapshot to >0 in the current snapshot. - -Inputs: -- data/issues_snapshot.json (required) -- data/issues_snapshot.prev.json (optional; if missing, resurfacing cannot be computed) - -Outputs: -- reports/metrics.json -- reports/summary.md (appends derived metrics) -""" - -import json -import logging -import os -from datetime import datetime -from typing import Any, Dict, List, Optional - -SNAPSHOT_CUR = os.environ.get("SNAPSHOT_CURRENT", "data/issues_snapshot.json") -SNAPSHOT_PREV = os.environ.get("SNAPSHOT_PREVIOUS", "data/issues_snapshot.prev.json") - -OUT_METRICS_JSON = os.environ.get("OUT_METRICS_JSON", "reports/metrics.json") -OUT_SUMMARY_MD = os.environ.get("OUT_SUMMARY_MD", "reports/summary.md") - - -def require_env(key: str) -> str: - """Return the value of environment variable *key*, or exit.""" - try: - return os.environ[key] - except KeyError as exc: - raise SystemExit(f"Missing required environment variable: {key}") from exc - - -TEAM_SLUG = require_env("GITHUB_TEAM_SLUG") - - -def _safe_int(v: Any, default: int = 0) -> int: - """Coerce *v* to ``int``, returning *default* on failure.""" - try: - if v is None: - return default - if isinstance(v, int): - return v - s = str(v).strip() - if s == "": - return default - return int(float(s)) - except Exception: - return default - - -def _load_json(path: str) -> Optional[Any]: - """Load and return JSON from *path*, or ``None`` if the file is missing.""" - if not os.path.exists(path): - return None - with open(path, "r", encoding="utf-8") as f: - return json.load(f) - - -def _index_by_fingerprint(snapshot: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: - """Index snapshot items by their secmeta fingerprint.""" - idx: Dict[str, Dict[str, Any]] = {} - for item in snapshot: - fp = (item.get("secmeta") or {}).get("fingerprint") - if not fp: - # If secmeta is missing or malformed, it cannot participate in fingerprint-level stats. - continue - idx[fp] = item - return idx - - -def _severity_from_labels(labels: List[str]) -> str: - """Extract the severity token from ``sec:sev/`` labels.""" - for l in labels: - if l.startswith("sec:sev/"): - return l.split("/", 1)[1] - return "unknown" - - -def main() -> None: - """Derive and write team security metrics from issue snapshots.""" - # TODO decide about changes related to this script - logging.warning( - "This script is deprecated and may be removed in the future. Please refer to the updated documentation for deriving security metrics." - ) - return - - cur = _load_json(SNAPSHOT_CUR) - if cur is None: - raise SystemExit(f"Missing current snapshot: {SNAPSHOT_CUR}") - - if not isinstance(cur, list): - raise SystemExit(f"Current snapshot is not a list: {SNAPSHOT_CUR}") - - prev = _load_json(SNAPSHOT_PREV) - - cur_idx = _index_by_fingerprint(cur) - prev_idx = _index_by_fingerprint(prev) if isinstance(prev, list) else {} - - # Basic counts - total = len(cur) - by_sev: Dict[str, int] = {} - postponed = 0 - needs_review = 0 - - for item in cur: - labels = item.get("labels") or [] - sev = _severity_from_labels(labels) - by_sev[sev] = by_sev.get(sev, 0) + 1 - if "sec:state/postponed" in labels: - postponed += 1 - if "sec:state/needs-review" in labels: - needs_review += 1 - - # Resurfacing (B): prev occurrence_count == 0 and current > 0 - resurfaced: List[Dict[str, Any]] = [] - if prev_idx: - for fp, cur_item in cur_idx.items(): - cur_occ = _safe_int((cur_item.get("secmeta") or {}).get("occurrence_count"), 0) - prev_item = prev_idx.get(fp) - prev_occ = _safe_int(((prev_item or {}).get("secmeta") or {}).get("occurrence_count"), 0) - if prev_item is not None and prev_occ == 0 and cur_occ > 0: - resurfaced.append( - { - "fingerprint": fp, - "repo": cur_item.get("repo"), - "issue_number": cur_item.get("issue_number"), - "title": cur_item.get("title"), - "severity": _severity_from_labels(cur_item.get("labels") or []), - "prev_occurrence_count": prev_occ, - "current_occurrence_count": cur_occ, - } - ) - - metrics = { - "team": TEAM_SLUG, - "generated_at_utc": datetime.utcnow().isoformat() + "Z", - "snapshot_current": SNAPSHOT_CUR, - "snapshot_previous": SNAPSHOT_PREV if prev_idx else None, - "counts": { - "total_security_issues": total, - "postponed": postponed, - "needs_review": needs_review, - "by_severity": dict(sorted(by_sev.items())), - }, - "resurfaced": { - "definition": "B: fingerprint occurrence_count from 0 (previous snapshot) to >0 (current snapshot)", - "count": len(resurfaced), - "items": resurfaced, - }, - } - - os.makedirs(os.path.dirname(OUT_METRICS_JSON), exist_ok=True) - os.makedirs(os.path.dirname(OUT_SUMMARY_MD), exist_ok=True) - - with open(OUT_METRICS_JSON, "w", encoding="utf-8") as f: - json.dump(metrics, f, indent=2) - - # Append to (or create) summary.md - summary_lines: List[str] = [] - summary_lines.append(f"\n## Derived metrics\n") - summary_lines.append(f"Generated at: {metrics['generated_at_utc']}\n") - if metrics["snapshot_previous"] is None: - summary_lines.append("- Resurfacing: not computed (no previous snapshot found)\n") - else: - summary_lines.append(f"- Resurfaced fingerprints (definition B): {metrics['resurfaced']['count']}\n") - if resurfaced: - summary_lines.append("\n### Resurfaced items\n") - for r in resurfaced[:50]: - summary_lines.append( - f"- {r['severity']} {r['repo']}#{r['issue_number']} (occ {r['prev_occurrence_count']} -> {r['current_occurrence_count']}): {r['title']}\n" - ) - if len(resurfaced) > 50: - summary_lines.append(f"- ... and {len(resurfaced) - 50} more\n") - - # Ensure summary exists; if not, create a minimal header. - if not os.path.exists(OUT_SUMMARY_MD): - with open(OUT_SUMMARY_MD, "w", encoding="utf-8") as f: - f.write(f"# Security summary for team `{TEAM_SLUG}`\n\n") - - with open(OUT_SUMMARY_MD, "a", encoding="utf-8") as f: - f.writelines(summary_lines) - - -if __name__ == "__main__": - main() diff --git a/src/security/extract_team_security_stats.py b/src/security/extract_team_security_stats.py deleted file mode 100644 index da1b6ac..0000000 --- a/src/security/extract_team_security_stats.py +++ /dev/null @@ -1,206 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright 2026 ABSA Group Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# pylint: skip-file -""" -Extract security statistics per GitHub team. - -Model: -- Source of truth: GitHub Issues -- Scope: repositories owned by a given GitHub team -- Unit: one Issue = one logical vulnerability (fingerprint) - -Outputs: -- data/issues_snapshot.json -- data/events_flat.csv -- reports/summary.md -""" - -import csv -import json -import logging -import os -import re -from datetime import datetime -from github import Github - - -# -------------------- -# Configuration -# -------------------- -def require_env(key: str) -> str: - """Return the value of environment variable *key*, or exit.""" - try: - return os.environ[key] - except KeyError as exc: - raise SystemExit(f"Missing required environment variable: {key}") from exc - - -GITHUB_TOKEN = require_env("GITHUB_TOKEN") -ORG = require_env("GITHUB_ORG") -TEAM_SLUG = require_env("GITHUB_TEAM_SLUG") - -OUT_DATA = "data" -OUT_REPORTS = "reports" - -SEC_LABEL_PREFIX = "sec:" - -SEC_EVENT_RE = re.compile(r"\[sec-event\](.*?)\[/sec-event\]", re.S) -SECMETA_RE = re.compile(r"```secmeta(.*?)```", re.S) - -# -------------------- -# Helpers -# -------------------- - - -def ensure_dirs(): - """Create output directories if they don't exist.""" - os.makedirs(OUT_DATA, exist_ok=True) - os.makedirs(OUT_REPORTS, exist_ok=True) - - -def parse_kv_block(block: str) -> dict: - """Parse a ``key=value``-per-line block into a dict.""" - data = {} - for line in block.splitlines(): - line = line.strip() - if not line or "=" not in line: - continue - k, v = line.split("=", 1) - data[k.strip()] = v.strip() - return data - - -def parse_secmeta(body: str) -> dict: - """Extract the secmeta key-value block from an issue body.""" - match = SECMETA_RE.search(body or "") - if not match: - return {} - return parse_kv_block(match.group(1)) - - -def parse_events(comments): - """Extract ``[sec-event]`` blocks from issue comments.""" - events = [] - for c in comments: - for raw in SEC_EVENT_RE.findall(c.body or ""): - evt = parse_kv_block(raw) - evt["timestamp"] = c.created_at.isoformat() - events.append(evt) - return events - - -def issue_has_sec_label(issue): - """Return ``True`` if *issue* carries any ``sec:`` prefixed label.""" - return any(l.name.startswith(SEC_LABEL_PREFIX) for l in issue.labels) - - -# -------------------- -# Main extraction -# -------------------- - - -def main(): - """Extract security statistics from GitHub Issues for the configured team.""" - # TODO decide about changes related to this script - logging.warning( - "This script is deprecated and may be removed in the future. Please refer to the updated documentation for deriving security metrics." - ) - return - - ensure_dirs() - - gh = Github(GITHUB_TOKEN) - org = gh.get_organization(ORG) - team = org.get_team_by_slug(TEAM_SLUG) - - repos = list(team.get_repos()) - - snapshot = [] - flat_events = [] - - for repo in repos: - issues = repo.get_issues(state="all") - for issue in issues: - # Skip PRs that may be returned by the issues API - if getattr(issue, "pull_request", None): - continue - - if not issue_has_sec_label(issue): - continue - - secmeta = parse_secmeta(issue.body or "") - events = parse_events(issue.get_comments()) - - snapshot.append( - { - "repo": repo.full_name, - "issue_number": issue.number, - "title": issue.title, - "state": issue.state, - "labels": [l.name for l in issue.labels], - "secmeta": secmeta, - "created_at": issue.created_at.isoformat(), - "updated_at": issue.updated_at.isoformat(), - "event_count": len(events), - } - ) - - for e in events: - fp = secmeta.get("fingerprint") if secmeta else None - if not fp: - continue # ignore events without a fingerprint - flat_events.append( - { - "repo": repo.full_name, - "issue_number": issue.number, - "fingerprint": fp, - "action": e.get("action"), - "reason": e.get("reason"), - "timestamp": e.get("timestamp"), - } - ) - - # Write snapshot - with open(os.path.join(OUT_DATA, "issues_snapshot.json"), "w") as f: - json.dump(snapshot, f, indent=2) - - # Write flat events - with open(os.path.join(OUT_DATA, "events_flat.csv"), "w", newline="") as f: - writer = csv.DictWriter(f, fieldnames=["repo", "issue_number", "fingerprint", "action", "reason", "timestamp"]) - writer.writeheader() - writer.writerows(flat_events) - - # Summary report - total = len(snapshot) - by_sev = {} - - for item in snapshot: - sev = next((l for l in item["labels"] if l.startswith("sec:sev/")), "sec:sev/unknown") - by_sev[sev] = by_sev.get(sev, 0) + 1 - - with open(os.path.join(OUT_REPORTS, "summary.md"), "w") as f: - f.write(f"# Security summary for team `{TEAM_SLUG}`\n\n") - f.write(f"Generated at: {datetime.utcnow().isoformat()} UTC\n\n") - f.write(f"## Total security issues: {total}\n\n") - f.write("## By severity\n\n") - for sev, cnt in sorted(by_sev.items()): - f.write(f"- {sev}: {cnt}\n") - - -if __name__ == "__main__": - main() diff --git a/src/security/issues/builder.py b/src/security/issues/builder.py index a2c4cea..fed75c9 100644 --- a/src/security/issues/builder.py +++ b/src/security/issues/builder.py @@ -87,7 +87,7 @@ def build_parent_template_values(alert: Alert, *, rule_id: str, severity: str) - return { "category": alert.metadata.rule_name or NOT_AVAILABLE, "avd_id": alert.alert_details.vulnerability or rule_id, - "title": rule_id, + "title": alert.metadata.rule_description or rule_id, "severity": severity, "published_date": iso_date(alert.rule_details.published_date or NOT_AVAILABLE), "package_name": alert.rule_details.package_name, @@ -103,16 +103,10 @@ def build_parent_issue_body(alert: Alert) -> str: repo_full = alert.repo secmeta: dict[str, str] = { - "schema": "1", "type": SECMETA_TYPE_PARENT, "repo": repo_full, - "source": "code_scanning", - "tool": alert.metadata.tool, - "severity": severity, "rule_id": rule_id, - "first_seen": iso_date(alert.metadata.created_at), - "last_seen": iso_date(alert.metadata.updated_at), - "postponed_until": "", + "severity": severity, } values = build_parent_template_values(alert, rule_id=rule_id, severity=severity) @@ -120,10 +114,15 @@ def build_parent_issue_body(alert: Alert) -> str: return render_secmeta(secmeta) + "\n\n" + human_body -def build_issue_title(rule_name: str | None, rule_id: str, fingerprint: str) -> str: +def build_issue_title( + rule_description: str | None, + rule_name: str | None, + rule_id: str, + fingerprint: str, +) -> str: """Build the title string for a child issue.""" prefix = fingerprint[:8] if fingerprint else NOT_AVAILABLE - summary = (rule_name or rule_id or "Security finding").strip() or "Security finding" + summary = (rule_description or rule_name or rule_id or "Security finding").strip() return f"[SEC][FP={prefix}] {summary}" @@ -136,7 +135,7 @@ def build_child_issue_body(alert: Alert) -> str: vulnerability = alert.alert_details.vulnerability avd_id = vulnerability if vulnerability.startswith("AVD-") else NOT_AVAILABLE - title = alert.metadata.rule_id + title = alert.metadata.rule_description or alert.metadata.rule_id scm_file = alert.alert_details.scm_file start_line = alert.metadata.start_line @@ -169,7 +168,6 @@ def build_child_issue_body(alert: Alert) -> str: "installed_version": alert.alert_details.installed_version, "fixed_version": alert.rule_details.fixed_version, "reachable": alert.alert_details.reachable, - "scan_date": iso_date(alert.alert_details.scan_date or alert.metadata.updated_at or NOT_AVAILABLE), "first_seen": iso_date(alert.alert_details.first_seen or alert.metadata.created_at or NOT_AVAILABLE), } return render_markdown_template(CHILD_BODY_TEMPLATE, values).strip() + "\n" diff --git a/src/security/issues/models.py b/src/security/issues/models.py index d2c2d63..3bdaa9b 100644 --- a/src/security/issues/models.py +++ b/src/security/issues/models.py @@ -91,13 +91,11 @@ class AlertContext: alert: Alert alert_number: int fingerprint: str - occurrence_fp: str repo: str - first_seen: str - last_seen: str tool: str rule_id: str rule_name: str + rule_description: str severity: str cve: str path: str diff --git a/src/security/issues/sec_events.py b/src/security/issues/sec_events.py deleted file mode 100644 index 54625f4..0000000 --- a/src/security/issues/sec_events.py +++ /dev/null @@ -1,72 +0,0 @@ -# -# Copyright 2026 ABSA Group Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""``[sec-event]`` comment blocks – parsing, rendering, and stripping -structured lifecycle-event blocks from issue bodies. -""" - -import re - -from security.issues.secmeta import render_kv_lines - - -def parse_sec_event_fields(raw: str) -> dict[str, str]: - """Parse ``key=value`` lines from a raw sec-event block.""" - fields: dict[str, str] = {} - for line in (raw or "").splitlines(): - s = line.strip() - if not s or "=" not in s: - continue - k, v = s.split("=", 1) - fields[k.strip()] = v.strip() - return fields - - -def render_sec_event(fields: dict[str, str]) -> str: - """Render a structured ``[sec-event]`` comment block from *fields*.""" - preferred_order = [ - "action", - "seen_at", - "source", - "gh_alert_number", - "occurrence_fp", - "commit_sha", - "path", - "start_line", - "end_line", - ] - lines = ["[sec-event]"] - lines.extend(render_kv_lines(fields, preferred_order, skip_empty=True)) - lines.append("[/sec-event]") - return "\n".join(lines) + "\n" - - -def strip_sec_events_from_body(body: str) -> str: - """Remove any legacy sec-event content from an issue body. - - - Drops a dedicated '## Security Events' section if present (from previous versions). - - Removes any inline [sec-event] blocks. - """ - - text = body or "" - # Drop everything from the header onward (the section was intended to be last). - m = re.search(r"\n##\s+Security\s+Events\s*\n", text, flags=re.IGNORECASE) - if m: - text = text[: m.start()].rstrip() + "\n" - # Remove any inline blocks. - text = re.compile(r"\[sec-event\]\s*(.*?)\s*\[/sec-event\]", re.S).sub("", text) - text = re.sub(r"\n{3,}", "\n\n", text).strip() + "\n" - return text diff --git a/src/security/issues/secmeta.py b/src/security/issues/secmeta.py index fc1342b..f816ecc 100644 --- a/src/security/issues/secmeta.py +++ b/src/security/issues/secmeta.py @@ -79,22 +79,12 @@ def render_kv_lines( def render_secmeta(secmeta: dict[str, str]) -> str: """Render a secmeta dict as a hidden HTML-comment block for issue bodies.""" preferred_order = [ - "schema", + "type", "fingerprint", "repo", - "source", - "tool", - "severity", - "cve", - "category", "rule_id", - "first_seen", - "last_seen", - "last_seen_commit", - "postponed_until", + "severity", "gh_alert_numbers", - "occurrence_count", - "last_occurrence_fp", ] lines = render_kv_lines(secmeta, preferred_order) return "" diff --git a/src/security/issues/sync.py b/src/security/issues/sync.py index f414f0f..95ac10e 100644 --- a/src/security/issues/sync.py +++ b/src/security/issues/sync.py @@ -24,31 +24,28 @@ import logging -from core.helpers import iso_date, normalize_path, utc_today +from core.helpers import normalize_path from core.github.issues import ( gh_issue_add_labels, gh_issue_add_sub_issue_by_number, - gh_issue_comment, gh_issue_create, gh_issue_edit_body, gh_issue_edit_state, gh_issue_edit_title, gh_issue_get_sub_issue_numbers, + gh_issue_remove_labels, ) from core.github.projects import ProjectPrioritySync, gh_project_get_priority_field from core.models import Issue from core.rendering import render_markdown_template from security.alerts.models import Alert -from security.alerts.parser import compute_occurrence_fp from security.constants import ( LABEL_EPIC, LABEL_SCOPE_SECURITY, LABEL_SEC_ADEPT_TO_CLOSE, LABEL_TYPE_TECH_DEBT, NOT_AVAILABLE, - SEC_EVENT_OPEN, - SEC_EVENT_REOPEN, SECMETA_TYPE_CHILD, SECMETA_TYPE_PARENT, ) @@ -61,7 +58,6 @@ classify_category, ) from .models import AlertContext, IssueIndex, NotifiedIssue, SeverityChange, SyncContext, SyncResult -from .sec_events import render_sec_event, strip_sec_events_from_body from .secmeta import json_list, load_secmeta, parse_json_list, render_secmeta from .templates import PARENT_BODY_TEMPLATE @@ -127,30 +123,11 @@ def maybe_reopen_parent_issue( context, child_issue_number or "", ) - logging.info( - "DRY-RUN: would comment parent reopen sec-event on issue #%d (rule_id=%s)", - parent_issue.number, - rule_id, - ) parent_issue.state = "open" return if gh_issue_edit_state(repo, parent_issue.number, "open"): parent_issue.state = "open" - gh_issue_comment( - repo, - parent_issue.number, - render_sec_event( - { - "action": SEC_EVENT_REOPEN, - "seen_at": utc_today(), - "source": "code_scanning", - "rule_id": rule_id, - "context": context, - "child_issue": str(child_issue_number) if child_issue_number else "", - } - ), - ) def _close_resolved_parent_issues( @@ -232,11 +209,7 @@ def ensure_parent_issue( existing = find_parent_issue(index, rule_id=rule_id) if existing is not None: # Keep parent issues aligned to the template as alerts evolve. - existing_secmeta = load_secmeta(existing.body) or {"schema": "1"} - existing_first = existing_secmeta.get("first_seen") or iso_date(alert.metadata.created_at) - existing_last = existing_secmeta.get("last_seen") or iso_date(alert.metadata.updated_at) - first_seen_final = min(existing_first, iso_date(alert.metadata.created_at)) - last_seen_final = max(existing_last, iso_date(alert.metadata.updated_at)) + existing_secmeta = load_secmeta(existing.body) or {} existing_severity = str(existing_secmeta.get("severity") or "unknown") existing_severity_cmp = existing_severity.lower() @@ -253,7 +226,7 @@ def ensure_parent_issue( ) if dry_run: logging.info( - "DRY-RUN: severity change on parent #%d (rule_id=%s): %s \u2192 %s", + "DRY-RUN: severity change on parent #%d (rule_id=%s): %s - %s", existing.number, rule_id, existing_severity_cmp, @@ -266,16 +239,10 @@ def ensure_parent_issue( existing_secmeta.update( { - "schema": existing_secmeta.get("schema") or "1", "type": SECMETA_TYPE_PARENT, "repo": repo_full, - "source": existing_secmeta.get("source") or "code_scanning", - "tool": alert.metadata.tool or existing_secmeta.get("tool") or "", "severity": severity_stored, "rule_id": rule_id, - "first_seen": first_seen_final, - "last_seen": last_seen_final, - "postponed_until": existing_secmeta.get("postponed_until", ""), } ) @@ -288,7 +255,6 @@ def ensure_parent_issue( ).strip() + "\n" ) - rebuilt = strip_sec_events_from_body(rebuilt) # Snapshot the original body on first encounter so we can # defer the API call until all alerts have been processed. @@ -330,24 +296,6 @@ def ensure_parent_issue( if num is None: return None - # Parent lifecycle event (human visible): opened/created. - if dry_run: - logging.info("DRY-RUN: would comment parent open sec-event on issue #%d (rule_id=%s)", num, rule_id) - else: - gh_issue_comment( - repo_full, - num, - render_sec_event( - { - "action": SEC_EVENT_OPEN, - "seen_at": iso_date(alert.metadata.created_at), - "source": "code_scanning", - "rule_id": rule_id, - "severity": alert.metadata.severity, - } - ), - ) - created = Issue(number=num, state="open", title=title, body=body) issues[num] = created index.parent_by_rule_id[rule_id] = created @@ -397,29 +345,17 @@ def _handle_new_child_issue( """Create a new child issue for an alert that has no matching issue yet.""" category = classify_category(ctx.alert) secmeta: dict[str, str] = { - "schema": "1", "type": SECMETA_TYPE_CHILD, "fingerprint": ctx.fingerprint, "repo": ctx.repo, - "source": "code_scanning", - "tool": ctx.tool, - "severity": ctx.severity, - "category": category, "rule_id": ctx.rule_id, - "first_seen": ctx.first_seen, - "last_seen": ctx.last_seen, - "last_seen_commit": ctx.commit_sha, - "postponed_until": "", + "severity": ctx.severity, "gh_alert_numbers": json_list([str(ctx.alert_number)]), - "occurrence_count": "1", - "last_occurrence_fp": ctx.occurrence_fp, } - if ctx.cve: - secmeta["cve"] = ctx.cve human_body = build_child_issue_body(ctx.alert) body = render_secmeta(secmeta) + "\n\n" + human_body - title = build_issue_title(ctx.rule_name, ctx.rule_id, ctx.fingerprint) + title = build_issue_title(ctx.rule_description, ctx.rule_name, ctx.rule_id, ctx.fingerprint) if sync.dry_run: labels = [LABEL_SCOPE_SECURITY, LABEL_TYPE_TECH_DEBT] @@ -428,8 +364,7 @@ def _handle_new_child_issue( commit_short = ctx.commit_sha[:8] if ctx.commit_sha else "" logging.info( "DRY-RUN: create child alert=%d rule_id=%s sev=%s" - " fp=%s tool=%s commit=%s loc=%s title=%r labels=[%s]" - " | secmeta:first_seen=%s last_seen=%s occurrence_count=1 gh_alert_numbers=[%d]", + " fp=%s tool=%s commit=%s loc=%s title=%r labels=[%s] gh_alert_numbers=[%d]", ctx.alert_number, ctx.rule_id, ctx.severity, @@ -439,8 +374,6 @@ def _handle_new_child_issue( loc, title, ",".join(labels), - ctx.first_seen, - ctx.last_seen, ctx.alert_number, ) if parent_issue is None and ctx.rule_id: @@ -504,28 +437,32 @@ def _handle_new_child_issue( logging.info("Add sub-issue link parent=#%d child=#%d (alert %d)", parent_issue.number, num, ctx.alert_number) gh_issue_add_sub_issue_by_number(ctx.repo, parent_issue.number, num) - gh_issue_comment( - ctx.repo, - num, - render_sec_event( - { - "action": SEC_EVENT_OPEN, - "seen_at": ctx.first_seen, - "source": "code_scanning", - "gh_alert_number": str(ctx.alert_number), - "occurrence_fp": str(ctx.occurrence_fp), - "commit_sha": str(ctx.commit_sha), - "path": str(ctx.path), - "start_line": str(ctx.start_line or ""), - "end_line": str(ctx.end_line or ""), - } - ), - ) - if sync.priority_sync is not None: sync.priority_sync.enqueue(ctx.repo, num, ctx.severity, sync.severity_priority_map) +def _remove_adept_to_close_label(repo: str, issue: Issue, *, dry_run: bool) -> None: + """Remove the ``sec:adept-to-close`` label from *issue* if present.""" + if not issue.labels or LABEL_SEC_ADEPT_TO_CLOSE not in issue.labels: + return + + if dry_run: + logging.info( + "DRY-RUN: would remove label %r from issue #%d", + LABEL_SEC_ADEPT_TO_CLOSE, + issue.number, + ) + else: + logging.info( + "Security - Removing label %r from reopened issue #%d", + LABEL_SEC_ADEPT_TO_CLOSE, + issue.number, + ) + gh_issue_remove_labels(repo, issue.number, [LABEL_SEC_ADEPT_TO_CLOSE]) + + issue.labels.remove(LABEL_SEC_ADEPT_TO_CLOSE) + + def _maybe_reopen_child( *, ctx: AlertContext, @@ -549,6 +486,7 @@ def _maybe_reopen_child( logging.info("Reopened issue #%d (alert %d)", issue.number, ctx.alert_number) if reopened: + _remove_adept_to_close_label(ctx.repo, issue, dry_run=sync.dry_run) maybe_reopen_parent_issue( ctx.repo, parent_issue, @@ -578,12 +516,9 @@ def _merge_child_secmeta( *, ctx: AlertContext, issue: Issue, -) -> tuple[dict[str, str], bool]: - """Merge incoming alert data into the child issue's secmeta. - - Returns ``(updated_secmeta, new_occurrence)``. - """ - secmeta = load_secmeta(issue.body) or {"schema": "1"} +) -> dict[str, str]: + """Merge incoming alert data into the child issue's secmeta.""" + secmeta = load_secmeta(issue.body) or {} secmeta.pop("alert_hash", None) existing_alerts = parse_json_list(secmeta.get("gh_alert_numbers")) @@ -592,40 +527,18 @@ def _merge_child_secmeta( if str(ctx.alert_number) not in existing_alerts: existing_alerts.append(str(ctx.alert_number)) - last_occ_fp = secmeta.get("last_occurrence_fp", "") - occurrence_count = int(secmeta.get("occurrence_count") or "0" or 0) - new_occurrence = bool(ctx.occurrence_fp and ctx.occurrence_fp != last_occ_fp) - if occurrence_count <= 0: - occurrence_count = 1 - if new_occurrence: - occurrence_count += 1 - - existing_first = secmeta.get("first_seen") or ctx.first_seen - existing_last = secmeta.get("last_seen") or ctx.last_seen - first_seen_final = min(existing_first, ctx.first_seen) - last_seen_final = max(existing_last, ctx.last_seen) - secmeta.update( { + "type": SECMETA_TYPE_CHILD, "fingerprint": ctx.fingerprint, "repo": ctx.repo, - "source": secmeta.get("source") or "code_scanning", - "tool": ctx.tool or secmeta.get("tool", ""), - "severity": ctx.severity, - "category": classify_category(ctx.alert) or secmeta.get("category", ""), "rule_id": ctx.rule_id or secmeta.get("rule_id", ""), - "first_seen": first_seen_final, - "last_seen": last_seen_final, - "last_seen_commit": ctx.commit_sha or secmeta.get("last_seen_commit", ""), + "severity": ctx.severity, "gh_alert_numbers": json_list(existing_alerts), - "occurrence_count": str(occurrence_count), - "last_occurrence_fp": ctx.occurrence_fp or last_occ_fp, } ) - if ctx.cve: - secmeta["cve"] = ctx.cve - return secmeta, new_occurrence + return secmeta def _rebuild_and_apply_child_body( @@ -638,7 +551,6 @@ def _rebuild_and_apply_child_body( """Render a fresh child body from *secmeta* + template and apply if changed.""" human_body = build_child_issue_body(ctx.alert) new_body = render_secmeta(secmeta) + "\n\n" + human_body - new_body = strip_sec_events_from_body(new_body) if new_body != issue.body: if sync.dry_run: @@ -652,32 +564,6 @@ def _rebuild_and_apply_child_body( issue.body = new_body -def _comment_child_event( - *, - ctx: AlertContext, - sync: SyncContext, - issue: Issue, - reopened: bool, -) -> None: - """Post a reopen sec-event comment on the child issue.""" - if reopened: - if sync.dry_run: - logging.info("DRY-RUN: would comment reopen event on issue #%d (alert %d)", issue.number, ctx.alert_number) - else: - gh_issue_comment( - ctx.repo, - issue.number, - render_sec_event( - { - "action": SEC_EVENT_REOPEN, - "seen_at": utc_today(), - "source": "code_scanning", - "gh_alert_number": str(ctx.alert_number), - } - ), - ) - - def _sync_child_title_and_labels( *, ctx: AlertContext, @@ -685,7 +571,7 @@ def _sync_child_title_and_labels( issue: Issue, ) -> None: """Fix title drift and ensure required labels and priority on the child issue.""" - expected_title = build_issue_title(ctx.rule_name, ctx.rule_id, ctx.fingerprint) + expected_title = build_issue_title(ctx.rule_description, ctx.rule_name, ctx.rule_id, ctx.fingerprint) if expected_title != (issue.title or ""): if sync.dry_run: logging.info( @@ -758,10 +644,9 @@ def _handle_existing_child_issue( if parent_issue is None and ctx.rule_id: parent_issue = find_parent_issue(sync.index, rule_id=ctx.rule_id) - reopened = _maybe_reopen_child(ctx=ctx, sync=sync, issue=issue, parent_issue=parent_issue) - secmeta, _ = _merge_child_secmeta(ctx=ctx, issue=issue) + _maybe_reopen_child(ctx=ctx, sync=sync, issue=issue, parent_issue=parent_issue) + secmeta = _merge_child_secmeta(ctx=ctx, issue=issue) _rebuild_and_apply_child_body(ctx=ctx, sync=sync, issue=issue, secmeta=secmeta) - _comment_child_event(ctx=ctx, sync=sync, issue=issue, reopened=reopened) _sync_child_title_and_labels(ctx=ctx, sync=sync, issue=issue) if parent_issue is not None: @@ -801,11 +686,7 @@ def ensure_issue( "Ensure the collector/scanner includes an 'Alert hash: ...' line." ) - occurrence_fp = compute_occurrence_fp(commit_sha, path, start_line, end_line) - repo_full = alert.repo - first_seen = iso_date(alert.metadata.created_at) - last_seen = iso_date(alert.metadata.updated_at) parent_issue = ensure_parent_issue( alert, @@ -826,13 +707,11 @@ def ensure_issue( alert=alert, alert_number=alert_number, fingerprint=fingerprint, - occurrence_fp=occurrence_fp, repo=repo_full, - first_seen=first_seen, - last_seen=last_seen, tool=alert.metadata.tool, rule_id=rule_id, rule_name=alert.metadata.rule_name, + rule_description=alert.metadata.rule_description, severity=alert.metadata.severity, cve=cve, path=path, diff --git a/src/security/issues/templates.py b/src/security/issues/templates.py index f90aa2d..d2d7ede 100644 --- a/src/security/issues/templates.py +++ b/src/security/issues/templates.py @@ -81,6 +81,5 @@ ## Detection Timeline -- **Scan date:** {{ scan_date }} - **First seen:** {{ first_seen }} """ diff --git a/tests/core/github/test_issues.py b/tests/core/github/test_issues.py index d03f7dc..3fbdf45 100644 --- a/tests/core/github/test_issues.py +++ b/tests/core/github/test_issues.py @@ -41,6 +41,7 @@ gh_issue_get_rest_id, gh_issue_get_sub_issue_numbers, gh_issue_list_by_label, + gh_issue_remove_labels, ) @@ -329,6 +330,19 @@ def test_add_labels_not_found_hint(mocker: MockerFixture, caplog) -> None: assert any("deleted or transferred" in r.message for r in caplog.records) +# gh_issue_remove_labels + +def test_remove_labels_success(mocker: MockerFixture) -> None: + mock_run = mocker.patch("core.github.issues.run_gh", return_value=_ok()) + gh_issue_remove_labels("org/repo", 1, ["sec:adept-to-close"]) + mock_run.assert_called_once() + +def test_remove_labels_no_labels_skips_call(mocker: MockerFixture) -> None: + mock_run = mocker.patch("core.github.issues.run_gh") + gh_issue_remove_labels("org/repo", 1, []) + mock_run.assert_not_called() + + def test_create_issue_success_url(mocker: MockerFixture) -> None: mocker.patch( "core.github.issues.run_gh", diff --git a/tests/security/alerts/test_models.py b/tests/security/alerts/test_models.py index 1de2f72..cfd55c3 100644 --- a/tests/security/alerts/test_models.py +++ b/tests/security/alerts/test_models.py @@ -30,12 +30,14 @@ def test_alert_metadata_none_fields_do_not_crash() -> None: severity=None, # type: ignore[arg-type] – mirrors _normalise_alert output rule_id=None, # type: ignore[arg-type] rule_name=None, # type: ignore[arg-type] + rule_description=None, # type: ignore[arg-type] state=None, # type: ignore[arg-type] tool=None, # type: ignore[arg-type] ) assert md.severity == "unknown" assert md.rule_id == "" assert md.rule_name == "" + assert md.rule_description == "" assert md.state == "" assert md.tool == "" diff --git a/tests/security/conftest.py b/tests/security/conftest.py index a475954..7edd1aa 100644 --- a/tests/security/conftest.py +++ b/tests/security/conftest.py @@ -38,6 +38,7 @@ "alert_url": "https://github.com/test-org/test-repo/security/code-scanning/303", "rule_id": "req-with-very-false-aquasec-python", "rule_name": "sast", + "rule_description": "Requests with verify=False", "severity": "high", "confidence": "error", "tags": ["HIGH", "sast", "security"], @@ -108,6 +109,7 @@ "alert_url": "https://github.com/test-org/test-repo/security/code-scanning/312", "rule_id": "CVE-2026-25755", "rule_name": "vulnerabilities", + "rule_description": "jsPDF PDF object injection", "severity": "high", "confidence": "error", "tags": ["HIGH", "security", "vulnerabilities"], @@ -179,6 +181,7 @@ "alert_url": "https://github.com/test-org/test-repo/security/code-scanning/317", "rule_id": "AVD-PIPELINE-0008", "rule_name": "pipelineMisconfigurations", + "rule_description": "Dependency not pinned to commit SHA", "severity": "medium", "confidence": "warning", "tags": ["MEDIUM", "pipelineMisconfigurations", "security"], diff --git a/tests/security/issues/test_builder.py b/tests/security/issues/test_builder.py index d182e8f..c350ab9 100644 --- a/tests/security/issues/test_builder.py +++ b/tests/security/issues/test_builder.py @@ -278,19 +278,23 @@ def test_contains_confidence(vuln_alert: Alert) -> None: def test_format() -> None: fp = "a1b2c3d4e5f6" - title = build_issue_title("sast", "rule-123", fp) - assert title == "[SEC][FP=a1b2c3d4] sast" + title = build_issue_title("A description", "sast", "rule-123", fp) + assert title == "[SEC][FP=a1b2c3d4] A description" + +def test_fallback_to_rule_name() -> None: + title = build_issue_title(None, "sast", "rule-123", "abcdef12") + assert "sast" in title def test_fallback_to_rule_id() -> None: - title = build_issue_title(None, "rule-123", "abcdef12") + title = build_issue_title(None, None, "rule-123", "abcdef12") assert "rule-123" in title def test_fallback_to_default() -> None: - title = build_issue_title(None, "", "abcdef12") + title = build_issue_title(None, None, "", "abcdef12") assert "Security finding" in title def test_empty_fingerprint() -> None: - title = build_issue_title("sast", "rule-123", "") + title = build_issue_title("A description", "sast", "rule-123", "") assert "N/A" in title @@ -303,7 +307,7 @@ def test_empty_fingerprint() -> None: def test_sast_avd_id(sast_alert: Alert) -> None: body = build_child_issue_body(sast_alert) - assert "req-with-very-false-aquasec-python" in body + assert "Requests with verify=False" in body def test_sast_alert_hash(sast_alert: Alert) -> None: body = build_child_issue_body(sast_alert) @@ -311,7 +315,7 @@ def test_sast_alert_hash(sast_alert: Alert) -> None: def test_sast_title(sast_alert: Alert) -> None: body = build_child_issue_body(sast_alert) - assert "sast" in body + assert "Requests with verify=False" in body def test_sast_message_present(sast_alert: Alert) -> None: body = build_child_issue_body(sast_alert) @@ -333,10 +337,6 @@ def test_sast_reachable_from_msg(sast_alert: Alert) -> None: body = build_child_issue_body(sast_alert) assert "False" in body -def test_sast_scan_date(sast_alert: Alert) -> None: - body = build_child_issue_body(sast_alert) - assert "2026-02-24" in body - def test_sast_first_seen(sast_alert: Alert) -> None: body = build_child_issue_body(sast_alert) assert "2025-09-17" in body @@ -345,7 +345,7 @@ def test_sast_first_seen(sast_alert: Alert) -> None: def test_vuln_avd_id(vuln_alert: Alert) -> None: body = build_child_issue_body(vuln_alert) - assert "CVE-2026-25755" in body + assert "jsPDF PDF object injection" in body def test_vuln_installed_version(vuln_alert: Alert) -> None: body = build_child_issue_body(vuln_alert) @@ -410,21 +410,6 @@ def test_all_template_sections_rendered(vuln_alert: Alert) -> None: assert "## Detection Timeline" in body -def test_scan_date_falls_back_to_metadata_updated_at() -> None: - """When alert_details.scan_date is absent, fall back to metadata.updated_at.""" - alert = Alert.from_dict({ - "metadata": { - "rule_id": "X", - "updated_at": "2026-01-15T10:00:00Z", - "created_at": "2025-12-01T08:00:00Z", - }, - "alert_details": {}, # scan_date absent → defaults to "" - "rule_details": {}, - }) - body = build_child_issue_body(alert) - assert "2026-01-15" in body - - def test_first_seen_falls_back_to_metadata_created_at() -> None: """When alert_details.first_seen is absent, fall back to metadata.created_at.""" alert = Alert.from_dict({ @@ -440,7 +425,7 @@ def test_first_seen_falls_back_to_metadata_created_at() -> None: assert "2025-12-01" in body -def test_scan_date_and_first_seen_yield_na_when_no_fallback() -> None: +def test_first_seen_yields_na_when_no_fallback() -> None: """When neither alert_details nor metadata provide dates, render N/A.""" alert = Alert.from_dict({ "metadata": {"rule_id": "X"}, # no updated_at / created_at @@ -448,4 +433,4 @@ def test_scan_date_and_first_seen_yield_na_when_no_fallback() -> None: "rule_details": {}, }) body = build_child_issue_body(alert) - assert body.count("N/A") >= 2 + assert body.count("N/A") >= 1 diff --git a/tests/security/issues/test_models.py b/tests/security/issues/test_models.py index a0ad502..9df3447 100644 --- a/tests/security/issues/test_models.py +++ b/tests/security/issues/test_models.py @@ -108,9 +108,10 @@ def test_issue_index_creation() -> None: def test_alert_context_creation() -> None: ctx = AlertContext( - alert={}, alert_number=1, fingerprint="fp", occurrence_fp="ofp", - repo="org/repo", first_seen="2026-01-01", last_seen="2026-01-02", + alert={}, alert_number=1, fingerprint="fp", + repo="org/repo", tool="AquaSec", rule_id="R1", rule_name="sast", + rule_description="Test finding description", severity="high", cve="CVE-79", path="src/f.py", start_line=10, end_line=20, commit_sha="abc123", ) diff --git a/tests/security/issues/test_sec_events.py b/tests/security/issues/test_sec_events.py deleted file mode 100644 index 849bc29..0000000 --- a/tests/security/issues/test_sec_events.py +++ /dev/null @@ -1,163 +0,0 @@ -# -# Copyright 2026 ABSA Group Limited -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Unit tests for ``utils.sec_events``.""" - -import pytest - -from security.issues.sec_events import ( - parse_sec_event_fields, - render_sec_event, - strip_sec_events_from_body, -) - - -# ===================================================================== -# parse_sec_event_fields -# ===================================================================== - - -def test_basic_kv() -> None: - raw = "action=created\nseen_at=2026-01-01\nsource=aquasec" - result = parse_sec_event_fields(raw) - assert result == { - "action": "created", - "seen_at": "2026-01-01", - "source": "aquasec", - } - -def test_ignores_blank_and_no_equals() -> None: - raw = "action=created\n\njust-text\nfoo=bar" - result = parse_sec_event_fields(raw) - assert result == {"action": "created", "foo": "bar"} - -def test_equals_in_value() -> None: - raw = "path=a=b" - result = parse_sec_event_fields(raw) - assert result == {"path": "a=b"} - -def test_empty_string() -> None: - assert parse_sec_event_fields("") == {} - -def test_none_input() -> None: - assert parse_sec_event_fields(None) == {} - -def test_strips_whitespace() -> None: - raw = " action = created \n seen_at = 2026-01-01 " - result = parse_sec_event_fields(raw) - assert result == {"action": "created", "seen_at": "2026-01-01"} - - -# ===================================================================== -# render_sec_event -# ===================================================================== - - -def test_renders_fields_in_preferred_order() -> None: - fields = { - "commit_sha": "abc123", - "action": "created", - "seen_at": "2026-01-01", - } - rendered = render_sec_event(fields) - lines = rendered.strip().splitlines() - assert lines[0] == "[sec-event]" - assert lines[-1] == "[/sec-event]" - # action should come before commit_sha (preferred order) - assert lines.index("action=created") < lines.index("commit_sha=abc123") - -def test_includes_non_preferred_keys_sorted() -> None: - fields = {"action": "created", "z_custom": "1", "a_extra": "2"} - rendered = render_sec_event(fields) - # extra keys sorted alphabetically after preferred - assert "a_extra=2" in rendered - assert "z_custom=1" in rendered - lines = rendered.strip().splitlines() - idx_a = lines.index("a_extra=2") - idx_z = lines.index("z_custom=1") - assert idx_a < idx_z - -def test_skips_blank_values() -> None: - fields = {"action": "created", "path": " ", "source": ""} - rendered = render_sec_event(fields) - assert "path=" not in rendered - assert "source=" not in rendered - assert "action=created" in rendered - -def test_roundtrip() -> None: - fields = { - "action": "created", - "seen_at": "2026-01-01", - "source": "aquasec", - "gh_alert_number": "42", - "occurrence_fp": "fp123", - } - rendered = render_sec_event(fields) - # Extract inner content (skip opening/closing tags) - inner = "\n".join(rendered.strip().splitlines()[1:-1]) - parsed = parse_sec_event_fields(inner) - assert parsed == fields - - -# ===================================================================== -# strip_sec_events_from_body -# ===================================================================== - - -def test_removes_inline_block() -> None: - body = "Some text\n[sec-event]\naction=created\n[/sec-event]\nMore text" - result = strip_sec_events_from_body(body) - assert "[sec-event]" not in result - assert "Some text" in result - assert "More text" in result - -def test_removes_section_header() -> None: - body = "Intro\n\n## Security Events\nold stuff\n" - result = strip_sec_events_from_body(body) - assert "## Security Events" not in result - assert "Intro" in result - -def test_empty_body() -> None: - result = strip_sec_events_from_body("") - assert result.strip() == "" - -def test_none_body() -> None: - result = strip_sec_events_from_body(None) - assert result.strip() == "" - -def test_no_events() -> None: - body = "Just regular body text\n" - result = strip_sec_events_from_body(body) - assert "Just regular body text" in result - -def test_multiple_inline_blocks() -> None: - body = ( - "Text\n" - "[sec-event]\naction=created\n[/sec-event]\n" - "Middle\n" - "[sec-event]\naction=reopened\n[/sec-event]\n" - "End\n" - ) - result = strip_sec_events_from_body(body) - assert "[sec-event]" not in result - assert "Text" in result - assert "Middle" in result - assert "End" in result - -def test_collapses_excessive_newlines() -> None: - body = "A\n\n\n\n\nB\n" - result = strip_sec_events_from_body(body) - assert "\n\n\n" not in result diff --git a/tests/security/issues/test_secmeta.py b/tests/security/issues/test_secmeta.py index 8fc28cc..1ac9017 100644 --- a/tests/security/issues/test_secmeta.py +++ b/tests/security/issues/test_secmeta.py @@ -115,10 +115,10 @@ def test_preferred_order() -> None: } rendered = render_secmeta(data) lines = rendered.strip().split("\n") - # schema should appear before fingerprint - schema_idx = next(i for i, l in enumerate(lines) if "schema=" in l) + # type should appear before fingerprint per preferred_order + type_idx = next(i for i, l in enumerate(lines) if "type=" in l) fp_idx = next(i for i, l in enumerate(lines) if "fingerprint=" in l) - assert schema_idx < fp_idx + assert type_idx < fp_idx def test_secmeta_roundtrip() -> None: """Render then parse should recover the original data.""" diff --git a/tests/security/issues/test_sync.py b/tests/security/issues/test_sync.py index bd40352..affc440 100644 --- a/tests/security/issues/test_sync.py +++ b/tests/security/issues/test_sync.py @@ -27,7 +27,6 @@ from security.issues.sync import ( _append_notification, _close_resolved_parent_issues, - _comment_child_event, _ensure_child_linked_to_parent, _flush_parent_body_updates, _handle_existing_child_issue, @@ -37,6 +36,7 @@ _maybe_reopen_child, _merge_child_secmeta, _rebuild_and_apply_child_body, + _remove_adept_to_close_label, _sync_child_title_and_labels, build_issue_index, ensure_issue, @@ -72,13 +72,11 @@ def _make_alert_context(**overrides: Any) -> AlertContext: alert=Alert(), alert_number=1, fingerprint="fp_test_123", - occurrence_fp="occ_fp_test", repo="test-org/test-repo", - first_seen="2026-01-01", - last_seen="2026-01-02", tool="AquaSec", rule_id="CVE-2026-1234", rule_name="sast", + rule_description="Test finding description", severity="high", cve="CVE-2026-1234", path="src/main.py", @@ -128,6 +126,7 @@ def test_indexes_parent_by_rule_id() -> None: assert "CVE-2026-1234" in idx.parent_by_rule_id assert idx.parent_by_rule_id["CVE-2026-1234"] is parent + def test_indexes_child_by_fingerprint() -> None: child = _issue_with_secmeta(2, { "type": "child", @@ -139,6 +138,7 @@ def test_indexes_child_by_fingerprint() -> None: assert "fp_abc123" in idx.by_fingerprint assert idx.by_fingerprint["fp_abc123"] is child + def test_parent_not_in_fingerprint_index() -> None: parent = _issue_with_secmeta(1, { "type": "parent", @@ -149,6 +149,7 @@ def test_parent_not_in_fingerprint_index() -> None: idx = build_issue_index(issues) assert "fp_parent" not in idx.by_fingerprint + def test_first_fingerprint_wins() -> None: """If two children share the same fingerprint, the first one wins.""" c1 = _issue_with_secmeta(10, {"type": "child", "fingerprint": "shared_fp"}) @@ -157,6 +158,7 @@ def test_first_fingerprint_wins() -> None: idx = build_issue_index(issues) assert idx.by_fingerprint["shared_fp"].number == 10 + def test_mixed_issues() -> None: parent = _issue_with_secmeta(1, {"type": "parent", "rule_id": "R1"}) child = _issue_with_secmeta(2, {"type": "child", "fingerprint": "fp1", "rule_id": "R1"}) @@ -165,11 +167,13 @@ def test_mixed_issues() -> None: assert "R1" in idx.parent_by_rule_id assert "fp1" in idx.by_fingerprint + def test_empty() -> None: idx = build_issue_index({}) assert idx.by_fingerprint == {} assert idx.parent_by_rule_id == {} + def test_alert_hash_fallback() -> None: """Fingerprint falls back to alert_hash when fingerprint is absent.""" child = _issue_with_secmeta(5, {"type": "child", "alert_hash": "hash_123"}) @@ -177,6 +181,7 @@ def test_alert_hash_fallback() -> None: idx = build_issue_index(issues) assert "hash_123" in idx.by_fingerprint + def test_no_secmeta_skipped() -> None: """Issue without secmeta block is silently skipped.""" issue = Issue(number=99, state="open", title="No meta", body="Plain body\n") @@ -198,6 +203,7 @@ def test_find_issue_found() -> None: assert result is not None assert result.number == 2 + def test_find_issue_not_found() -> None: idx = build_issue_index({}) assert find_issue_in_index(idx, fingerprint="missing") is None @@ -215,6 +221,7 @@ def test_find_parent_found() -> None: assert result is not None assert result.number == 1 + def test_find_parent_not_found() -> None: idx = build_issue_index({}) assert find_parent_issue(idx, rule_id="missing") is None @@ -233,6 +240,7 @@ def test_reopen_parent_none(mocker: MockerFixture) -> None: ) mock_edit.assert_not_called() + def test_reopen_parent_already_open() -> None: """No-op when parent is already open.""" parent = Issue(number=1, state="open", title="P", body="b") @@ -241,6 +249,7 @@ def test_reopen_parent_already_open() -> None: ) assert parent.state == "open" + def test_reopen_parent_dry_run() -> None: """In dry-run mode, sets state to open without calling gh.""" parent = Issue(number=1, state="closed", title="P", body="b") @@ -249,20 +258,17 @@ def test_reopen_parent_dry_run() -> None: ) assert parent.state == "open" + def test_reopen_parent_real(mocker: MockerFixture) -> None: - """Non-dry-run reopens issue and posts sec-event comment.""" + """Non-dry-run reopens issue via state edit.""" mock_edit_state = mocker.patch("security.issues.sync.gh_issue_edit_state", return_value=True) - mock_comment = mocker.patch("security.issues.sync.gh_issue_comment") parent = Issue(number=1, state="closed", title="P", body="b") maybe_reopen_parent_issue( "org/repo", parent, rule_id="R1", dry_run=False, context="reopen_child", child_issue_number=5, ) assert parent.state == "open" mock_edit_state.assert_called_once_with("org/repo", 1, "open") - mock_comment.assert_called_once() - comment_body = mock_comment.call_args[0][2] - assert "reopen" in comment_body - assert "R1" in comment_body + def test_reopen_parent_gh_failure(mocker: MockerFixture) -> None: """If gh_issue_edit_state fails, state stays closed.""" @@ -288,6 +294,7 @@ def test_append_notification_active() -> None: assert len(notifications) == 1 assert notifications[0].issue_number == 42 + def test_append_notification_none() -> None: """No-op when notifications is None — must not raise.""" _append_notification( @@ -295,7 +302,7 @@ def test_append_notification_none() -> None: severity="high", category="sast", state="new", tool="AquaSec", ) # No assertion needed: the test passes if no exception is raised. - + # ===================================================================== # _merge_child_secmeta @@ -308,47 +315,12 @@ def test_merge_new_alert_number() -> None: "type": "child", "fingerprint": "fp1", "gh_alert_numbers": '["100"]', - "occurrence_count": "1", - "last_occurrence_fp": "old_occ", - "first_seen": "2026-01-01", - "last_seen": "2026-01-01", }) - ctx = _make_alert_context(alert_number=200, fingerprint="fp1", occurrence_fp="new_occ") - secmeta, new_occurrence = _merge_child_secmeta(ctx=ctx, issue=child) + ctx = _make_alert_context(alert_number=200, fingerprint="fp1") + secmeta = _merge_child_secmeta(ctx=ctx, issue=child) assert "200" in secmeta["gh_alert_numbers"] assert "100" in secmeta["gh_alert_numbers"] - assert new_occurrence is True - assert secmeta["occurrence_count"] == "2" - -def test_merge_same_occurrence_fp() -> None: - """Same occurrence_fp means no new occurrence counted.""" - child = _issue_with_secmeta(1, { - "type": "child", - "fingerprint": "fp1", - "gh_alert_numbers": '["100"]', - "occurrence_count": "1", - "last_occurrence_fp": "same_occ", - "first_seen": "2026-01-01", - "last_seen": "2026-01-01", - }) - ctx = _make_alert_context(alert_number=100, fingerprint="fp1", occurrence_fp="same_occ") - secmeta, new_occurrence = _merge_child_secmeta(ctx=ctx, issue=child) - assert new_occurrence is False - assert secmeta["occurrence_count"] == "1" -def test_merge_date_range_expansion() -> None: - """first_seen takes the min, last_seen takes the max.""" - child = _issue_with_secmeta(1, { - "type": "child", - "fingerprint": "fp1", - "first_seen": "2026-02-01", - "last_seen": "2026-02-15", - "occurrence_count": "1", - }) - ctx = _make_alert_context(first_seen="2026-01-15", last_seen="2026-03-01") - secmeta, _ = _merge_child_secmeta(ctx=ctx, issue=child) - assert secmeta["first_seen"] == "2026-01-15" - assert secmeta["last_seen"] == "2026-03-01" def test_merge_removes_alert_hash() -> None: """Legacy alert_hash key is dropped during merge.""" @@ -356,26 +328,11 @@ def test_merge_removes_alert_hash() -> None: "type": "child", "alert_hash": "old_hash", "fingerprint": "fp1", - "occurrence_count": "1", - "first_seen": "2026-01-01", - "last_seen": "2026-01-01", }) ctx = _make_alert_context(fingerprint="fp1") - secmeta, _ = _merge_child_secmeta(ctx=ctx, issue=child) + secmeta = _merge_child_secmeta(ctx=ctx, issue=child) assert "alert_hash" not in secmeta -def test_merge_zero_occurrence_count_reset() -> None: - """occurrence_count <= 0 is reset to at least 1.""" - child_secmeta_str = render_secmeta({ - "type": "child", "fingerprint": "fp1", - "occurrence_count": "0", "last_occurrence_fp": "same_fp", - "first_seen": "2026-01-01", "last_seen": "2026-01-01", - }) - child = Issue(number=1, state="open", title="T", body=child_secmeta_str + "\nBody\n") - ctx = _make_alert_context(fingerprint="fp1", occurrence_fp="same_fp") - secmeta, _ = _merge_child_secmeta(ctx=ctx, issue=child) - assert int(secmeta["occurrence_count"]) >= 1 - # ===================================================================== # _maybe_reopen_child @@ -389,6 +346,7 @@ def test_reopen_child_open_issue() -> None: sync = _make_sync_context() assert _maybe_reopen_child(ctx=ctx, sync=sync, issue=issue, parent_issue=None) is False + def test_reopen_child_dry_run() -> None: """Dry-run marks reopened=True and appends notification.""" body = render_secmeta({"type": "child", "category": "sast"}) + "\nbody" @@ -401,6 +359,7 @@ def test_reopen_child_dry_run() -> None: assert len(notifications) == 1 assert notifications[0].state == "reopen" + def test_reopen_child_real(mocker: MockerFixture) -> None: """Non-dry-run calls gh_issue_edit_state and appends notification.""" mock_edit = mocker.patch("security.issues.sync.gh_issue_edit_state", return_value=True) @@ -414,6 +373,7 @@ def test_reopen_child_real(mocker: MockerFixture) -> None: assert len(notifications) == 1 mock_edit.assert_called_once() + def test_reopen_child_cascades_to_parent(mocker: MockerFixture) -> None: """Reopening child also reopens the closed parent (dry-run).""" mocker.patch("security.issues.sync.gh_issue_edit_state", return_value=True) @@ -426,6 +386,45 @@ def test_reopen_child_cascades_to_parent(mocker: MockerFixture) -> None: assert parent.state == "open" +# _remove_adept_to_close_label + +def test_remove_adept_label_present(mocker: MockerFixture) -> None: + """Removes sec:adept-to-close label when present on a reopened issue.""" + mock_remove = mocker.patch("security.issues.sync.gh_issue_remove_labels") + issue = Issue(number=1, state="open", title="T", body="b", labels=["scope:security", "sec:adept-to-close"]) + _remove_adept_to_close_label("org/repo", issue, dry_run=False) + mock_remove.assert_called_once_with("org/repo", 1, ["sec:adept-to-close"]) + assert "sec:adept-to-close" not in issue.labels + + +def test_remove_adept_label_not_present(mocker: MockerFixture) -> None: + """No-op when sec:adept-to-close label is not on the issue.""" + mock_remove = mocker.patch("security.issues.sync.gh_issue_remove_labels") + issue = Issue(number=1, state="open", title="T", body="b", labels=["scope:security"]) + _remove_adept_to_close_label("org/repo", issue, dry_run=False) + mock_remove.assert_not_called() + + +def test_remove_adept_label_none_labels() -> None: + """No-op when issue.labels is None.""" + issue = Issue(number=1, state="open", title="T", body="b", labels=None) + _remove_adept_to_close_label("org/repo", issue, dry_run=False) + + +def test_reopen_child_removes_adept_label(mocker: MockerFixture) -> None: + """Reopening a child issue also removes the sec:adept-to-close label.""" + mocker.patch("security.issues.sync.gh_issue_edit_state", return_value=True) + mock_remove = mocker.patch("security.issues.sync.gh_issue_remove_labels") + body = render_secmeta({"type": "child", "category": "sast"}) + "\nbody" + issue = Issue(number=5, state="closed", title="T", body=body, labels=["scope:security", "sec:adept-to-close"]) + ctx = _make_alert_context() + sync = _make_sync_context() + result = _maybe_reopen_child(ctx=ctx, sync=sync, issue=issue, parent_issue=None) + assert result is True + mock_remove.assert_called_once_with("test-org/test-repo", 5, ["sec:adept-to-close"]) + assert "sec:adept-to-close" not in issue.labels + + # ===================================================================== # _rebuild_and_apply_child_body # ===================================================================== @@ -441,20 +440,20 @@ def test_rebuild_body_changed(mocker: MockerFixture, sast_alert: Alert) -> None: _rebuild_and_apply_child_body(ctx=ctx, sync=sync, issue=issue, secmeta=secmeta) mock_edit.assert_called_once() + def test_rebuild_body_unchanged(sast_alert: Alert) -> None: """When body is identical, no API call is made.""" from security.issues.builder import build_child_issue_body - from security.issues.sec_events import strip_sec_events_from_body secmeta = {"schema": "1", "type": "child", "fingerprint": "fp1"} human_body = build_child_issue_body(sast_alert) body = render_secmeta(secmeta) + "\n\n" + human_body - body = strip_sec_events_from_body(body) issue = Issue(number=1, state="open", title="T", body=body) ctx = _make_alert_context(alert=sast_alert) sync = _make_sync_context() _rebuild_and_apply_child_body(ctx=ctx, sync=sync, issue=issue, secmeta=secmeta) + def test_rebuild_body_dry_run(sast_alert: Alert) -> None: """In dry-run mode, body is not written via API.""" issue = Issue(number=1, state="open", title="T", body="old body") @@ -464,54 +463,6 @@ def test_rebuild_body_dry_run(sast_alert: Alert) -> None: _rebuild_and_apply_child_body(ctx=ctx, sync=sync, issue=issue, secmeta=secmeta) -# ===================================================================== -# _comment_child_event -# ===================================================================== - - -def test_comment_reopen_event(mocker: MockerFixture) -> None: - """Posts a reopen sec-event comment when reopened=True.""" - mock_comment = mocker.patch("security.issues.sync.gh_issue_comment") - issue = Issue(number=1, state="open", title="T", body="b") - ctx = _make_alert_context() - sync = _make_sync_context() - _comment_child_event(ctx=ctx, sync=sync, issue=issue, reopened=True) - mock_comment.assert_called_once() - comment_body = mock_comment.call_args[0][2] - assert "reopen" in comment_body - -def test_comment_occurrence_event_no_comment(mocker: MockerFixture) -> None: - """No sec-event comment when issue is already open (new_occurrence=True but reopened=False).""" - mock_comment = mocker.patch("security.issues.sync.gh_issue_comment") - issue = Issue(number=1, state="open", title="T", body="b") - ctx = _make_alert_context() - sync = _make_sync_context() - _comment_child_event(ctx=ctx, sync=sync, issue=issue, reopened=False) - mock_comment.assert_not_called() - -def test_comment_no_event() -> None: - """No comment when neither reopened nor new_occurrence.""" - issue = Issue(number=1, state="open", title="T", body="b") - ctx = _make_alert_context() - sync = _make_sync_context() - _comment_child_event(ctx=ctx, sync=sync, issue=issue, reopened=False) - -def test_comment_reopen_dry_run() -> None: - """Dry-run mode does not call gh_issue_comment for reopen.""" - issue = Issue(number=1, state="open", title="T", body="b") - ctx = _make_alert_context() - sync = _make_sync_context(dry_run=True) - _comment_child_event(ctx=ctx, sync=sync, issue=issue, reopened=True) - -def test_comment_occurrence_dry_run() -> None: - """No comment in any mode when issue is already open (occurrence-only path).""" - issue = Issue(number=1, state="open", title="T", body="b") - ctx = _make_alert_context() - sync = _make_sync_context(dry_run=True) - # Dry-run should also be silent for already-open issues. - _comment_child_event(ctx=ctx, sync=sync, issue=issue, reopened=False) - - # ===================================================================== # _sync_child_title_and_labels # ===================================================================== @@ -528,17 +479,19 @@ def test_sync_title_drift_corrected(mocker: MockerFixture) -> None: mock_title.assert_called_once() mock_labels.assert_called_once() + def test_sync_title_already_correct(mocker: MockerFixture) -> None: """Title is not updated when it matches the expected format.""" mock_labels = mocker.patch("security.issues.sync.gh_issue_add_labels") from security.issues.builder import build_issue_title - title = build_issue_title("sast", "CVE-2026-1234", "fp_test_123") + title = build_issue_title("Test finding description", "sast", "CVE-2026-1234", "fp_test_123") issue = Issue(number=1, state="open", title=title, body="b") ctx = _make_alert_context(rule_name="sast", rule_id="CVE-2026-1234", fingerprint="fp_test_123") sync = _make_sync_context() _sync_child_title_and_labels(ctx=ctx, sync=sync, issue=issue) mock_labels.assert_called_once() + def test_sync_title_dry_run() -> None: """Dry-run mode logs instead of calling gh.""" issue = Issue(number=1, state="open", title="Wrong", body="b") @@ -555,7 +508,6 @@ def test_sync_title_dry_run() -> None: def test_handle_new_child_creates_issue(mocker: MockerFixture, sast_alert: Alert) -> None: """Creates a new issue and registers it in the index.""" mock_create = mocker.patch("security.issues.sync.gh_issue_create", return_value=42) - mocker.patch("security.issues.sync.gh_issue_comment") ctx = _make_alert_context(alert=sast_alert, rule_name="sast") issues: dict[int, Issue] = {} index = IssueIndex(by_fingerprint={}, parent_by_rule_id={}) @@ -567,6 +519,7 @@ def test_handle_new_child_creates_issue(mocker: MockerFixture, sast_alert: Alert assert len(sync.notifications) == 1 assert sync.notifications[0].state == "new" + def test_handle_new_child_dry_run(sast_alert: Alert) -> None: """Dry-run does not call gh_issue_create but records notification.""" ctx = _make_alert_context(alert=sast_alert) @@ -576,17 +529,18 @@ def test_handle_new_child_dry_run(sast_alert: Alert) -> None: assert len(notifications) == 1 assert notifications[0].issue_number == 0 + def test_handle_new_child_links_to_parent(mocker: MockerFixture, sast_alert: Alert) -> None: """When a parent issue exists, the child is linked as a sub-issue.""" mocker.patch("security.issues.sync.gh_issue_create", return_value=42) mock_sub = mocker.patch("security.issues.sync.gh_issue_add_sub_issue_by_number") - mocker.patch("security.issues.sync.gh_issue_comment") parent = Issue(number=1, state="open", title="P", body="pb") ctx = _make_alert_context(alert=sast_alert) sync = _make_sync_context(notifications=[]) _handle_new_child_issue(ctx=ctx, sync=sync, parent_issue=parent) mock_sub.assert_called_once_with("test-org/test-repo", 1, 42) + def test_handle_new_child_create_fails(mocker: MockerFixture, sast_alert: Alert) -> None: """If gh_issue_create returns None, no crash and no index update.""" mocker.patch("security.issues.sync.gh_issue_create", return_value=None) @@ -672,7 +626,6 @@ def test_ensure_child_linked_api_failure_no_cache_update(mocker: MockerFixture) def test_ensure_parent_creates_new(mocker: MockerFixture, sast_alert: Alert) -> None: """Creates a parent issue when none exists for the rule_id.""" mock_create = mocker.patch("security.issues.sync.gh_issue_create", return_value=99) - mocker.patch("security.issues.sync.gh_issue_comment") issues: dict[int, Issue] = {} index = IssueIndex(by_fingerprint={}, parent_by_rule_id={}) result = ensure_parent_issue(sast_alert, issues, index, dry_run=False) @@ -681,6 +634,7 @@ def test_ensure_parent_creates_new(mocker: MockerFixture, sast_alert: Alert) -> mock_create.assert_called_once() assert sast_alert.metadata.rule_id in index.parent_by_rule_id + def test_ensure_parent_dry_run(sast_alert: Alert) -> None: """Dry-run does not create an issue, returns None.""" issues: dict[int, Issue] = {} @@ -688,14 +642,13 @@ def test_ensure_parent_dry_run(sast_alert: Alert) -> None: result = ensure_parent_issue(sast_alert, issues, index, dry_run=True) assert result is None + def test_ensure_parent_existing_returns_existing(sast_alert: Alert) -> None: """Returns the existing parent issue if one already exists.""" parent = _issue_with_secmeta(10, { "type": "parent", "rule_id": sast_alert.metadata.rule_id, "severity": "high", - "first_seen": "2026-01-01", - "last_seen": "2026-01-01", }) issues = {10: parent} index = build_issue_index(issues) @@ -703,14 +656,13 @@ def test_ensure_parent_existing_returns_existing(sast_alert: Alert) -> None: assert result is not None assert result.number == 10 + def test_ensure_parent_severity_change_detected(sast_alert: Alert) -> None: """Severity change is detected and recorded.""" parent = _issue_with_secmeta(10, { "type": "parent", "rule_id": sast_alert.metadata.rule_id, "severity": "low", - "first_seen": "2026-01-01", - "last_seen": "2026-01-01", }) issues = {10: parent} index = build_issue_index(issues) @@ -720,6 +672,7 @@ def test_ensure_parent_severity_change_detected(sast_alert: Alert) -> None: assert changes[0].old_severity == "low" assert changes[0].new_severity == "high" + def test_ensure_parent_no_rule_id() -> None: """Returns None when alert has no rule_id.""" alert = Alert.from_dict({"metadata": {"rule_id": ""}, "alert_details": {}, "rule_details": {}}) @@ -727,6 +680,7 @@ def test_ensure_parent_no_rule_id() -> None: index = IssueIndex(by_fingerprint={}, parent_by_rule_id={}) assert ensure_parent_issue(alert, issues, index, dry_run=False) is None + def test_ensure_parent_create_fails(mocker: MockerFixture, sast_alert: Alert) -> None: """Returns None if gh_issue_create fails.""" mocker.patch("security.issues.sync.gh_issue_create", return_value=None) @@ -735,14 +689,13 @@ def test_ensure_parent_create_fails(mocker: MockerFixture, sast_alert: Alert) -> result = ensure_parent_issue(sast_alert, issues, index, dry_run=False) assert result is None + def test_ensure_parent_body_deferred(sast_alert: Alert) -> None: """Parent body update is deferred (snapshot captured in parent_original_bodies).""" parent = _issue_with_secmeta(10, { "type": "parent", "rule_id": sast_alert.metadata.rule_id, "severity": "high", - "first_seen": "2026-01-01", - "last_seen": "2026-01-01", }) original_body = parent.body issues = {10: parent} @@ -752,6 +705,7 @@ def test_ensure_parent_body_deferred(sast_alert: Alert) -> None: assert 10 in bods assert bods[10][1] == original_body + def test_ensure_parent_title_drift_corrected(mocker: MockerFixture, sast_alert: Alert) -> None: """Title is updated when it drifts from the expected format.""" mock_title = mocker.patch("security.issues.sync.gh_issue_edit_title", return_value=True) @@ -759,8 +713,6 @@ def test_ensure_parent_title_drift_corrected(mocker: MockerFixture, sast_alert: "type": "parent", "rule_id": sast_alert.metadata.rule_id, "severity": "high", - "first_seen": "2026-01-01", - "last_seen": "2026-01-01", }) parent.title = "Wrong old title" issues = {10: parent} @@ -782,18 +734,21 @@ def test_flush_writes_changed_bodies(mocker: MockerFixture) -> None: _flush_parent_body_updates(bods, {1: issue}, dry_run=False) mock_edit.assert_called_once_with("org/repo", 1, "new body") + def test_flush_skips_unchanged() -> None: """Does not call API if body is unchanged.""" issue = Issue(number=1, state="open", title="T", body="same body") bods = {1: ("org/repo", "same body")} _flush_parent_body_updates(bods, {1: issue}, dry_run=False) + def test_flush_dry_run() -> None: """Dry-run logs instead of calling API.""" issue = Issue(number=1, state="open", title="T", body="new body") bods = {1: ("org/repo", "old body")} _flush_parent_body_updates(bods, {1: issue}, dry_run=True) + def test_flush_missing_issue() -> None: """Skips silently if issue is no longer in the dict.""" bods = {99: ("org/repo", "old body")} @@ -810,10 +765,12 @@ def test_label_orphan_no_orphans() -> None: child = _issue_with_secmeta(1, {"type": "child", "fingerprint": "fp1"}) index = build_issue_index({1: child}) alerts: dict[int, Alert] = { - 100: Alert.from_dict({"metadata": {"state": "open"}, "alert_details": {"alert_hash": "fp1"}, "rule_details": {}}), + 100: Alert.from_dict( + {"metadata": {"state": "open"}, "alert_details": {"alert_hash": "fp1"}, "rule_details": {}}), } _label_orphan_issues(alerts, index, dry_run=False) + def test_label_orphan_found(mocker: MockerFixture) -> None: """Labels child issues that have no matching alert.""" mock_labels = mocker.patch("security.issues.sync.gh_issue_add_labels") @@ -827,6 +784,7 @@ def test_label_orphan_found(mocker: MockerFixture) -> None: label_args = mock_labels.call_args[0][2] assert "sec:adept-to-close" in label_args + def test_label_orphan_dry_run() -> None: """Dry-run: logs but does not call gh.""" child = _issue_with_secmeta(1, { @@ -835,6 +793,7 @@ def test_label_orphan_dry_run() -> None: index = build_issue_index({1: child}) _label_orphan_issues({}, index, dry_run=True) + def test_label_orphan_skips_already_labelled() -> None: """Skips if adept-to-close label is already present.""" child = _issue_with_secmeta(1, { @@ -844,6 +803,7 @@ def test_label_orphan_skips_already_labelled() -> None: index = build_issue_index({1: child}) _label_orphan_issues({}, index, dry_run=False) + def test_label_orphan_skips_closed_issues() -> None: """Closed child issues are not labelled as orphans.""" child = _issue_with_secmeta(1, { @@ -852,6 +812,7 @@ def test_label_orphan_skips_closed_issues() -> None: index = build_issue_index({1: child}) _label_orphan_issues({}, index, dry_run=False) + def test_label_orphan_no_repo_in_secmeta() -> None: """Skips labelling if no repo in secmeta.""" child = _issue_with_secmeta(1, { @@ -919,6 +880,7 @@ def test_ensure_issue_dry_run(sast_alert: Alert) -> None: assert len(notifications) == 1 assert notifications[0].issue_number == 0 + def test_ensure_issue_skips_non_open() -> None: """Alerts with state != 'open' are skipped.""" alert = Alert.from_dict({ @@ -930,6 +892,7 @@ def test_ensure_issue_skips_non_open() -> None: index = IssueIndex(by_fingerprint={}, parent_by_rule_id={}) ensure_issue(alert, _make_sync_context(issues=issues, index=index, dry_run=True)) + def test_ensure_issue_missing_alert_hash_raises() -> None: """Raises SystemExit when alert hash is missing.""" alert = Alert.from_dict({ @@ -942,6 +905,7 @@ def test_ensure_issue_missing_alert_hash_raises() -> None: with pytest.raises(SystemExit, match="alert_hash"): ensure_issue(alert, _make_sync_context(issues=issues, index=index, dry_run=True)) + def test_ensure_issue_missing_alert_details_raises() -> None: """Raises SystemExit when alert_details has no alert_hash.""" alert = Alert.from_dict({ @@ -966,17 +930,19 @@ def test_sync_empty() -> None: assert result.notifications == [] assert result.severity_changes == [] + def test_sync_dry_run_single_alert(sast_alert: Alert) -> None: """Single alert dry-run produces a notification.""" alerts = {303: sast_alert} result = sync_alerts_and_issues(alerts, {}, dry_run=True) assert len(result.notifications) == 1 + def test_sync_severity_change_detected(sast_alert: Alert) -> None: """Severity change on existing parent is captured in result.""" parent = _issue_with_secmeta(10, { "type": "parent", "rule_id": sast_alert.metadata.rule_id, - "severity": "low", "first_seen": "2026-01-01", "last_seen": "2026-01-01", + "severity": "low", }) issues = {10: parent} result = sync_alerts_and_issues({303: sast_alert}, issues, dry_run=True) @@ -1015,6 +981,7 @@ def test_init_priority_sync_no_map() -> None: ) assert result is None + def test_init_priority_sync_no_project_number() -> None: """Returns None when project_number is falsy (0 or None).""" result = _init_priority_sync( @@ -1022,6 +989,7 @@ def test_init_priority_sync_no_project_number() -> None: ) assert result is None + def test_init_priority_sync_no_org_returns_none() -> None: """Returns None with warning when org cannot be determined.""" alerts = {1: Alert.from_dict({"_repo": ""})} @@ -1031,6 +999,7 @@ def test_init_priority_sync_no_org_returns_none() -> None: ) assert result is None + def test_init_priority_sync_field_lookup_fails(mocker: MockerFixture) -> None: """Returns None when gh_project_get_priority_field fails.""" mocker.patch("security.issues.sync.gh_project_get_priority_field", return_value=None) diff --git a/tests/security/issues/test_templates.py b/tests/security/issues/test_templates.py index 97265d2..9399980 100644 --- a/tests/security/issues/test_templates.py +++ b/tests/security/issues/test_templates.py @@ -102,7 +102,7 @@ def test_child_contains_all_placeholders() -> None: "{{ avd_id }}", "{{ alert_hash }}", "{{ title }}", "{{ message }}", "{{ repository_full_name }}", "{{ file_display }}", "{{ file_permalink }}", "{{ package_name }}", "{{ installed_version }}", "{{ fixed_version }}", - "{{ reachable }}", "{{ scan_date }}", "{{ first_seen }}", + "{{ reachable }}", "{{ first_seen }}", ] for ph in expected_placeholders: assert ph in CHILD_BODY_TEMPLATE, f"Missing placeholder: {ph}" @@ -120,7 +120,6 @@ def test_child_renders_without_error() -> None: "installed_version": "1.0", "fixed_version": "2.0", "reachable": "True", - "scan_date": "2026-01-01", "first_seen": "2026-01-01", } result = render_markdown_template(CHILD_BODY_TEMPLATE, values) diff --git a/tests/security/test_collect_alert.py b/tests/security/test_collect_alert.py index 57fce59..bc701b3 100644 --- a/tests/security/test_collect_alert.py +++ b/tests/security/test_collect_alert.py @@ -49,6 +49,7 @@ "rule": { "id": "rule-1", "name": "sast", + "description": "Requests with verify=False", "security_severity_level": "high", "severity": "error", "tags": ["HIGH", "sast"], @@ -263,6 +264,7 @@ def test_normalise_alert_metadata() -> None: assert meta["state"] == "open" assert meta["rule_id"] == "rule-1" assert meta["rule_name"] == "sast" + assert meta["rule_description"] == "Requests with verify=False" assert meta["severity"] == "high" assert meta["confidence"] == "error" assert meta["tags"] == ["HIGH", "sast"] @@ -298,6 +300,7 @@ def test_normalise_alert_minimal() -> None: assert meta["alert_number"] is None assert meta["state"] is None assert meta["rule_id"] is None + assert meta["rule_description"] is None assert meta["tool"] is None assert meta["file"] is None assert meta["tags"] == [] diff --git a/tests/security/test_constants.py b/tests/security/test_constants.py index 22b5fbd..91560d2 100644 --- a/tests/security/test_constants.py +++ b/tests/security/test_constants.py @@ -21,8 +21,6 @@ LABEL_SCOPE_SECURITY, LABEL_SEC_ADEPT_TO_CLOSE, LABEL_TYPE_TECH_DEBT, - SEC_EVENT_OPEN, - SEC_EVENT_REOPEN, SECMETA_TYPE_CHILD, SECMETA_TYPE_PARENT, ) @@ -41,13 +39,6 @@ def test_adept_to_close() -> None: assert LABEL_SEC_ADEPT_TO_CLOSE == "sec:adept-to-close" -def test_open() -> None: - assert SEC_EVENT_OPEN == "open" - -def test_reopen() -> None: - assert SEC_EVENT_REOPEN == "reopen" - - def test_parent() -> None: assert SECMETA_TYPE_PARENT == "parent"