Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Patterns
- Use private methods (`_method_name`) for internal class helpers
- All logs must start with "<Domain> -" prefix (e.g., "Security -")
- Never disable pylint behavior in the code
- External links should link to a new window

Testing
- Mirror src structure: `src/security/module.py` -> `tests/security/test_module.py`
Expand All @@ -61,5 +62,6 @@ Testing
- Use `@pytest.mark.parametrize` for data-driven tests (negative/failure scenarios with multiple similar cases)

Quality gates (run after changes, fix only if below threshold)
- Do all changes at once and run the gates afterward
- Run all quality gates at once: `make qa`
- Once a quality gate passes, do not re-run it in different scenarios
2 changes: 1 addition & 1 deletion .github/workflows/aquasec-scan.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ jobs:
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd
with:
repository: AbsaOSS/organizational-workflows
ref: ${{ github.job_workflow_sha }}
ref: ${{ job.workflow_sha }}
path: org-workflows
persist-credentials: false

Expand Down
17 changes: 17 additions & 0 deletions src/core/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,23 @@ def normalize_path(path: str | None) -> str:
return p


def normalize_bullet_list(text: str) -> str:
"""Normalize a Markdown bullet list so all items are at the top level.

Some sources (e.g. AquaSec) emit the first bullet without indentation
and subsequent bullets with leading whitespace, producing a nested list.
This function strips leading whitespace from any line whose stripped form
starts with ``- ``, so every item renders at the same level.
"""
if not text:
return text
lines = []
for line in text.splitlines():
stripped = line.lstrip()
lines.append(f"- {stripped[2:]}" if stripped.startswith("- ") else line)
return "\n".join(lines)


def sanitize_markdown(text: str) -> str:
"""Escape block-level Markdown so text renders as plain content in an issue body."""
if not text:
Expand Down
56 changes: 56 additions & 0 deletions src/core/rendering.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import re
from typing import Any

from security.constants import NOT_AVAILABLE

PLACEHOLDER_RE = re.compile(r"\{\{\s*([a-zA-Z0-9_\.]+)\s*\}\}")


Expand Down Expand Up @@ -49,3 +51,57 @@ def repl(match: re.Match[str]) -> str:
return str(v)

return PLACEHOLDER_RE.sub(repl, template)


def strip_na_sections(body: str) -> str:
"""Remove N/A fields and empty sections from a rendered Markdown body.

- Lines like ``- **Key:** N/A`` (with optional trailing whitespace/italics) are removed.
- Section headers (``## Heading``) with no remaining content are removed.
"""
lines = body.split("\n")
filtered: list[str] = []
i = 0
while i < len(lines):
line = lines[i]

# Remove bullet-list N/A fields: "- **Key:** N/A" with optional trailing content
if re.match(r"^\s*-\s+\*\*[^*]+:\*\*\s*N/A\s*$", line):
i += 1
# Also remove subsequent italic description lines
while i < len(lines) and re.match(r"^\s+\*\(.*\)\*\s*$", lines[i]):
i += 1
continue

filtered.append(line)
i += 1

# Second pass: remove ## headings whose section body is empty or just N/A
result: list[str] = []
i = 0
while i < len(filtered):
line = filtered[i]

if re.match(r"^##\s+", line):
# Collect the section body (until next ## heading or end)
section_header = line
section_body_lines: list[str] = []
j = i + 1
while j < len(filtered) and not re.match(r"^##\s+", filtered[j]):
section_body_lines.append(filtered[j])
j += 1

# Check if section body is effectively empty
body_text = "\n".join(section_body_lines).strip()
if not body_text or body_text == NOT_AVAILABLE:
i = j
continue

result.append(section_header)
result.extend(section_body_lines)
i = j
else:
result.append(line)
i += 1

return "\n".join(result)
27 changes: 24 additions & 3 deletions src/security/collect_alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
logger = logging.getLogger(__name__)

VALID_STATES = {"open", "dismissed", "fixed", "all"}

RULE_DETAIL_KEYS = [
"Type",
"Severity",
Expand All @@ -47,6 +46,7 @@
"OWASP",
"References",
]
MULTILINE_KEYS = {"references", "owasp"}


def _snake_case(name: str) -> str:
Expand All @@ -59,12 +59,33 @@ def _help_value(rule_help: str | None, name: str) -> str | None:
if not rule_help:
return None
m = re.search(rf"\*\*{re.escape(name)}:\*\*\s*([^\n\r]+)", rule_help, re.IGNORECASE)
return m.group(1) if m else None
return m.group(1).strip() if m else None


def _help_multiline_value(rule_help: str | None, name: str) -> str | None:
"""Extract a multi-line value from ``**Name:**`` markup in the rule help text."""
if not rule_help:
return None
m = re.search(
rf"\*\*{re.escape(name)}:\*\*[ \t]*([^\n\r]*(?:\n(?!\*\*[A-Za-z][\w\s]*?:\*\*)[^\n\r]*)*)",
rule_help,
re.IGNORECASE,
)
if not m:
return None
return m.group(1).strip() or None


def _parse_rule_details(rule_help: str | None) -> dict[str, str | None]:
"""Extract known rule detail fields from ``**Key:** value`` markup in rule help."""
return {_snake_case(key): _help_value(rule_help, key) for key in RULE_DETAIL_KEYS}
result: dict[str, str | None] = {}
for key in RULE_DETAIL_KEYS:
snake = _snake_case(key)
if snake in MULTILINE_KEYS:
result[snake] = _help_multiline_value(rule_help, key)
else:
result[snake] = _help_value(rule_help, key)
return result


def _parse_alert_details(message_text: str) -> dict[str, str]:
Expand Down
3 changes: 3 additions & 0 deletions src/security/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,7 @@
SECMETA_TYPE_PARENT = "parent"
SECMETA_TYPE_CHILD = "child"

SECURITY_FINDING_DEFAULT = "Security finding"
NOT_AVAILABLE = "N/A"

GITHUB_BASE_URL = "https://github.com"
91 changes: 60 additions & 31 deletions src/security/issues/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,31 @@

"""Issue title / body construction from Alert dataclasses."""

import html
from typing import Any

from core.helpers import iso_date, sanitize_markdown
from core.rendering import render_markdown_template
from core.helpers import iso_date, normalize_bullet_list, sanitize_markdown
from core.rendering import render_markdown_template, strip_na_sections

from security.constants import NOT_AVAILABLE, SECMETA_TYPE_PARENT, SECURITY_FINDING_DEFAULT, GITHUB_BASE_URL

from security.constants import NOT_AVAILABLE, SECMETA_TYPE_PARENT
from security.alerts.models import Alert
from security.issues.secmeta import render_secmeta
from security.issues.templates import CHILD_BODY_TEMPLATE, PARENT_BODY_TEMPLATE


def _new_window_link(text: str, url: str | None) -> str:
"""Wrap *text* in an HTML anchor that opens in a new window.

Returns plain *text* when *url* is empty or ``N/A``.
"""
safe_text = html.escape(text or "", quote=False)
if url and url != NOT_AVAILABLE:
safe_url = html.escape(url, quote=True)
return f'<a href="{safe_url}" target="_blank" rel="noopener noreferrer">{safe_text}</a>'
return safe_text


def _synthesize_references(alert: Alert) -> str:
"""Build a Markdown bullet list from metadata URLs when rule_details.references is absent."""
lines = []
Expand All @@ -45,7 +59,6 @@ def _synthesize_owasp(alert: Alert) -> str:

def alert_extra_data(alert: Alert) -> dict[str, Any]:
"""Build the extra-data dict for parent issue templates from nested alert data."""
rule_id = alert.metadata.rule_id
references = alert.rule_details.references
if references == NOT_AVAILABLE:
references = _synthesize_references(alert)
Expand All @@ -54,14 +67,15 @@ def alert_extra_data(alert: Alert) -> dict[str, Any]:
owasp = _synthesize_owasp(alert)

return {
"cve": rule_id if rule_id.upper().startswith("CVE-") else NOT_AVAILABLE,
"owasp": owasp,
"rule_id": alert.metadata.rule_id,
"owasp": sanitize_markdown(normalize_bullet_list(owasp)),
"category": alert.metadata.rule_name or NOT_AVAILABLE,
"impact": alert.rule_details.impact,
"likelihood": alert.rule_details.likelihood,
"confidence": alert.rule_details.confidence,
"advisory_url": alert.metadata.help_uri or NOT_AVAILABLE,
"impact": sanitize_markdown(alert.rule_details.impact),
"likelihood": sanitize_markdown(alert.rule_details.likelihood),
"confidence": sanitize_markdown(alert.rule_details.confidence),
"remediation": sanitize_markdown(alert.rule_details.remediation),
"references": references,
"references": sanitize_markdown(normalize_bullet_list(references)),
}


Expand All @@ -70,10 +84,9 @@ def classify_category(alert: Alert) -> str:
return alert.metadata.rule_name


def build_parent_issue_title(rule_id: str, severity: str = "") -> str:
def build_parent_issue_title(rule_id: str) -> str:
"""Build the title string for a parent issue."""
sev_tag = f"[{severity.upper()}] " if severity else ""
return f"{sev_tag}Security Alert – {rule_id}".strip()
return f"Security Alert – {rule_id}".strip()
Comment thread
tmikula-dev marked this conversation as resolved.


def build_parent_template_values(alert: Alert, *, rule_id: str, severity: str) -> dict[str, Any]:
Expand All @@ -85,11 +98,11 @@ def build_parent_template_values(alert: Alert, *, rule_id: str, severity: str) -
extra = alert_extra_data(alert)

return {
"category": alert.metadata.rule_name or NOT_AVAILABLE,
"avd_id": alert.alert_details.vulnerability or rule_id,
"title": sanitize_markdown(alert.metadata.rule_description or rule_id),
"category": alert.metadata.rule_name or NOT_AVAILABLE,
"severity": severity,
"published_date": iso_date(alert.rule_details.published_date or NOT_AVAILABLE),
"short_description": sanitize_markdown(alert.metadata.rule_description or NOT_AVAILABLE),
"package_name": alert.rule_details.package_name,
"fixed_version": alert.rule_details.fixed_version,
"extraData": extra,
Expand All @@ -110,20 +123,20 @@ def build_parent_issue_body(alert: Alert) -> str:
}

values = build_parent_template_values(alert, rule_id=rule_id, severity=severity)
human_body = render_markdown_template(PARENT_BODY_TEMPLATE, values).strip() + "\n"
human_body = strip_na_sections(render_markdown_template(PARENT_BODY_TEMPLATE, values)).strip() + "\n"
return render_secmeta(secmeta) + "\n\n" + human_body


def build_issue_title(
rule_description: str | None,
rule_name: str | None,
rule_id: str,
fingerprint: str,
severity: str = "",
) -> str:
"""Build the title string for a child issue."""
prefix = fingerprint[:8] if fingerprint else NOT_AVAILABLE
summary = (rule_description or rule_name or rule_id or "Security finding").strip()
return f"[SEC][FP={prefix}] {summary}"
sev_tag = f":{severity.upper()}" if severity else ""
summary = (rule_description or SECURITY_FINDING_DEFAULT).strip()
return f"[SEC{sev_tag}][FP={prefix}] {summary}"


def build_child_issue_body(alert: Alert) -> str:
Expand All @@ -132,42 +145,58 @@ def build_child_issue_body(alert: Alert) -> str:
if not repo_full:
repo_full = alert.alert_details.repository

vulnerability = alert.alert_details.vulnerability
avd_id = vulnerability if vulnerability.startswith("AVD-") else NOT_AVAILABLE

title = sanitize_markdown(alert.metadata.rule_description or alert.metadata.rule_id)

# Use artifact as the display file name, fall back to scm_file basename
artifact = alert.alert_details.artifact
scm_file = alert.alert_details.scm_file

if artifact and artifact != NOT_AVAILABLE:
file_display = artifact.rsplit("/", 1)[-1] if "/" in artifact else artifact
elif scm_file and scm_file != NOT_AVAILABLE:
file_display = scm_file.rsplit("/", 1)[-1]
else:
file_display = NOT_AVAILABLE

start_line = alert.metadata.start_line
start_line_str = str(start_line) if start_line is not None else ""

# Build a display name (filename only) and permalink with #L anchor
file_name = scm_file.rsplit("/", 1)[-1] if scm_file and scm_file != NOT_AVAILABLE else None
if scm_file and scm_file != NOT_AVAILABLE and start_line_str:
file_permalink = f"{scm_file}#L{start_line_str}"
file_display = f"{file_name}#L{start_line_str}"
else:
file_permalink = scm_file if scm_file != NOT_AVAILABLE else ""
file_display = file_name or NOT_AVAILABLE

# Start/End line logic
end_line = alert.metadata.end_line
if start_line is not None:
start_line_val = str(start_line)
end_line_val = str(end_line) if end_line is not None else start_line_val
else:
start_line_val = NOT_AVAILABLE
end_line_val = NOT_AVAILABLE

alert_hash = alert.alert_details.alert_hash
message = sanitize_markdown(alert.alert_details.message)

category = classify_category(alert)
repository_link = _new_window_link(repo_full, f"{GITHUB_BASE_URL}/{repo_full}")

values: dict[str, Any] = {
"severity": alert.metadata.severity,
"category": category or NOT_AVAILABLE,
"avd_id": avd_id,
"rule_id": alert.metadata.rule_id,
"alert_hash": alert_hash,
"title": title,
"first_seen": iso_date(alert.alert_details.first_seen or alert.metadata.created_at),
"message": message,
"repository_full_name": repo_full,
"repository_link": repository_link,
"file_display": file_display,
"file_permalink": file_permalink,
"start_line": start_line_val,
"end_line": end_line_val,
"package_name": alert.rule_details.package_name,
"installed_version": alert.alert_details.installed_version,
"fixed_version": alert.rule_details.fixed_version,
"reachable": alert.alert_details.reachable,
"first_seen": iso_date(alert.alert_details.first_seen or alert.metadata.created_at or NOT_AVAILABLE),
}
return render_markdown_template(CHILD_BODY_TEMPLATE, values).strip() + "\n"
return strip_na_sections(render_markdown_template(CHILD_BODY_TEMPLATE, values)).strip() + "\n"
1 change: 0 additions & 1 deletion src/security/issues/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ class AlertContext:
rule_name: str
rule_description: str
severity: str
cve: str
path: str
start_line: int | None
end_line: int | None
Expand Down
Loading