diff --git a/README.md b/README.md index e012134..c988dd7 100644 --- a/README.md +++ b/README.md @@ -103,6 +103,7 @@ conviso --help - Vulnerabilities with local field filter (auto deep for deep fields): `python -m conviso.app vulns list --company-id 443 --all --contains codeSnippet=eval( --contains fileName=app.py` - Vulnerabilities (DAST/WEB) search by request/response: `python -m conviso.app vulns list --company-id 443 --types DAST_FINDING,WEB_VULNERABILITY --all --contains request=Authorization --contains response=stacktrace` - Vulnerabilities with forced deep local search: `python -m conviso.app vulns list --company-id 443 --all --contains codeSnippet=eval( --deep-search --workers 8` +- Vulnerabilities (SCA) checking patches against OSV: `python -m conviso.app vulns check-sca-patches --company-id 443 --severities HIGH,CRITICAL --status RISK_ACCEPTED --all` - Vulnerability timeline (by vulnerability ID): `python -m conviso.app vulns timeline --id 12345` - Vulnerabilities timeline by project: `python -m conviso.app vulns timeline --company-id 443 --project-id 26102` - Last user who changed vuln status: `python -m conviso.app vulns timeline --id 12345 --last-status-change-only` diff --git a/src/conviso/VERSION b/src/conviso/VERSION index 0f82685..6678432 100644 --- a/src/conviso/VERSION +++ b/src/conviso/VERSION @@ -1 +1 @@ -0.3.7 +0.3.8 diff --git a/src/conviso/commands/vulnerabilities.py b/src/conviso/commands/vulnerabilities.py index 17ac69b..727e22f 100644 --- a/src/conviso/commands/vulnerabilities.py +++ b/src/conviso/commands/vulnerabilities.py @@ -10,6 +10,8 @@ import json import re from datetime import date, datetime, timedelta, timezone +from collections import defaultdict +import itertools import requests import time from conviso.core.notifier import info, error, summary, success, warning, timed_summary @@ -2100,3 +2102,291 @@ def clean_source_payload(base): except Exception as exc: error(f"Error updating vulnerability: {exc}") raise typer.Exit(code=1) + + + +# ---------------------- CHECK SCA PATCHES ---------------------- # +@app.command("check-sca-patches", help="Check OSV for available patches for SCA vulnerabilities and optionally update them.") +def check_sca_patches( + company_id: int = typer.Option(..., "--company-id", "-c", help="Company ID."), + asset_ids: Optional[str] = typer.Option(None, "--asset-ids", "-a", help="Comma-separated asset IDs to filter."), + severities: Optional[str] = typer.Option(None, "--severities", "-s", help="Comma-separated severities (NOTIFICATION,LOW,MEDIUM,HIGH,CRITICAL)."), + status: Optional[str] = typer.Option(None,"--status",help="Comma-separated vulnerability status labels."), + asset_tags: Optional[str] = typer.Option(None, "--asset-tags", "-t", help="Comma-separated asset tags."), + cves: Optional[str] = typer.Option(None, "--cves", help="Comma-separated CVE identifiers."), + page: int = typer.Option(1, "--page", "-p", help="Page number."), + per_page: int = typer.Option(50, "--per-page", "-l", help="Items per page."), + all_pages: bool = typer.Option(False, "--all", help="Fetch all pages."), + fmt: str = typer.Option("table", "--format", "-f", help="Output format: table, json, csv."), + output: Optional[str] = typer.Option(None, "--output", "-o", help="Output file for json/csv."), +): + """Check OSV for available patches for open SCA vulnerabilities without a patched version.""" + info(f"Checking SCA patches for company {company_id}...") + + def _split_ids(value: Optional[str]): + if not value: return None + ids = [] + for raw in value.split(","): + raw = raw.strip() + if not raw: continue + try: ids.append(int(raw)) + except ValueError: continue + return ids or None + + def _split_strs(value: Optional[str]): + if not value: return None + vals = [v.strip() for v in value.split(",") if v.strip()] + return vals or None + + query_sca = """ + query IssuesSca($companyId: ID!, $pagination: PaginationInput!, $filters: IssuesFiltersInput) { + issues(companyId: $companyId, pagination: $pagination, filters: $filters) { + collection { + id + title + status + type + asset { + id + name + assetsTagList + } + ... on ScaFinding { + severity + detail { + package + affectedVersion + patchedVersion + cve + } + } + } + metadata { + currentPage + totalPages + } + } + } + """ + + SEVERITY_ALLOWED = {"NOTIFICATION", "LOW", "MEDIUM", "HIGH", "CRITICAL"} + STATUS_ALLOWED = {"CREATED", "DRAFT", "IDENTIFIED", "IN_PROGRESS", "AWAITING_VALIDATION", "FIX_ACCEPTED", "RISK_ACCEPTED", "FALSE_POSITIVE", "SUPPRESSED"} + + assets_list = _split_ids(asset_ids) + + severities_list = None + if severities: + severities_list = [s.strip().upper() for s in severities.split(",") if s.strip()] + for s in severities_list: + if s not in SEVERITY_ALLOWED: + error(f"Invalid severity '{s}'. Allowed: {', '.join(SEVERITY_ALLOWED)}") + raise typer.Exit(code=1) + + status_list = None + if status: + status_list = [s.strip().upper() for s in status.split(",") if s.strip()] + for s in status_list: + if s not in STATUS_ALLOWED: + error(f"Invalid status '{s}'. Allowed: {', '.join(STATUS_ALLOWED)}") + raise typer.Exit(code=1) + + asset_tags_list = _split_strs(asset_tags) + cves_list = _split_strs(cves) + + filters = { + "failureTypes": ["SCA_FINDING"], + "statuses": status_list or ["CREATED", "IDENTIFIED", "IN_PROGRESS", "AWAITING_VALIDATION"] + } + if assets_list: filters["assetIds"] = assets_list + if severities_list: filters["severities"] = severities_list + if asset_tags_list: filters["assetTags"] = asset_tags_list + if cves_list: filters["cves"] = cves_list + + fetch_all = all_pages + current_page = page + + vars_base = { + "companyId": str(company_id), + "pagination": {"page": current_page, "perPage": per_page if not fetch_all else 200}, + "filters": filters + } + + grouped_issues = defaultdict(list) + total_issues_count = 0 + + def _fetch_page(page_num: int): + vars_page = dict(vars_base) + vars_page["pagination"]["page"] = page_num + res = graphql_request(query_sca, vars_page, log_request=True, verbose_only=True) + return page_num, res + + def _process_collection(collection): + nonlocal total_issues_count + for vuln in collection: + if vuln.get("type") != "SCA_FINDING": + continue + + detail = vuln.get("detail") or {} + patched_ver = detail.get("patchedVersion") + cve = detail.get("cve") + package = detail.get("package") + + if not patched_ver and (cve or package): + asset = vuln.get("asset") or {} + tags = ", ".join(asset.get("assetsTagList") or []) + severity_value = vuln.get("severity") or "" + + sev_color_map = { + "CRITICAL": "bold white on red", + "HIGH": "bold red", + "MEDIUM": "yellow", + "LOW": "green", + "NOTIFICATION": "cyan" + } + sev_display = severity_value + sev_style = sev_color_map.get(severity_value.upper(), None) + if sev_style: + sev_display = f"[{sev_style}]{severity_value}[/{sev_style}]" + + current_version = detail.get("affectedVersion") + + issue_data = { + "Vuln ID": str(vuln.get("id")), + "Asset ID": str(asset.get("id") or "-"), + "Asset Name": asset.get("name") or "-", + "Asset Tags": tags or "-", + "Package": package or "-", + "Status": vuln.get("status") or "-", + "Severity": sev_display, + "CVE": cve or "-", + "Current Version": current_version or "-", + } + + query_key = (cve, package, current_version) + grouped_issues[query_key].append(issue_data) + total_issues_count += 1 + + try: + page_num, res = _fetch_page(current_page) + issues_data = res.get("issues") or {} + _process_collection(issues_data.get("collection") or []) + + total_p = (issues_data.get("metadata") or {}).get("totalPages") or 1 + + if fetch_all and total_p > current_page: + page_numbers = list(range(current_page + 1, total_p + 1)) + page_results = parallel_map(_fetch_page, page_numbers) + + for _, p_res in sorted(page_results, key=lambda x: x[0]): + p_coll = (p_res.get("issues") or {}).get("collection") or [] + _process_collection(p_coll) + + except Exception as e: + error(f"Error fetching vulnerabilities: {e}") + raise typer.Exit(code=1) + + if total_issues_count == 0: + success("No open SCA vulnerabilities missing a patched version found.") + return + + info(f"Found {total_issues_count} SCA vulnerabilities missing patchedVersion. Querying OSV in parallel...") + + def _extract_fixes(affected): + ranges = affected.get("ranges", []) + + yield from ( + ("DB_SPEC", v["fixed"]) + for r in ranges + for v in (r.get("database_specific") or {}).get("versions", []) + if "fixed" in v + ) + yield from ( + (r.get("type"), e["fixed"]) + for r in ranges + for e in r.get("events", []) + if "fixed" in e + ) + yield from ( + ("DB_SPEC", v["fixed"]) + for v in (affected.get("database_specific") or {}).get("versions", []) + if "fixed" in v + ) + + def _get_best_fixed_version(affected_list, pkg_match=None): + def _matches(a): + name = (a.get("package") or {}).get("name") + return not pkg_match or not name or name == pkg_match + + all_fixes = list(itertools.chain.from_iterable( + _extract_fixes(a) for a in affected_list if _matches(a) + )) + + eco_fixes = {v for t, v in all_fixes if t in ("ECOSYSTEM", "SEMVER")} + db_spec_fixes = {v for t, v in all_fixes if t == "DB_SPEC"} + git_fixes = [v for t, v in all_fixes if t == "GIT"] + + if eco_fixes: return ", ".join(eco_fixes) + if db_spec_fixes: return ", ".join(db_spec_fixes) + return git_fixes[-1] if git_fixes else None + + http_session = requests.Session() + + def _fetch_osv_patch(query_key): + cve, package, current_version = query_key + found_patch = None + + try: + if cve: + resp = http_session.get(f"https://api.osv.dev/v1/vulns/{cve}", timeout=10) + if resp.status_code == 200: + data = resp.json() + found_patch = _get_best_fixed_version(data.get("affected", [])) + if not found_patch: + found_patch = next( + ( + patch + for alias in data.get("aliases", []) + for alias_resp in [http_session.get(f"https://api.osv.dev/v1/vulns/{alias}", timeout=10)] + if alias_resp.status_code == 200 + for patch in [_get_best_fixed_version(alias_resp.json().get("affected", []))] + if patch + ), + None, + ) + elif package and current_version: + payload = {"version": current_version, "package": {"name": package}} + resp = http_session.post("https://api.osv.dev/v1/query", json=payload, timeout=10) + if resp.status_code == 200: + found_patch = next( + ( + patch + for vuln in resp.json().get("vulns", []) + for patch in [_get_best_fixed_version(vuln.get("affected", []), pkg_match=package)] + if patch + ), + None, + ) + except Exception as e: + warning(f"Error querying OSV for {cve or package}: {e}") + + return query_key, found_patch + + raw_results = parallel_map(_fetch_osv_patch, grouped_issues.keys()) + + formatted_issues = [ + {**issue_data, "OSV Patched Version": patch} + for query_key, patch in raw_results if patch + for issue_data in grouped_issues[query_key] + ] + + if not formatted_issues: + warning("No patched versions found via OSV.") + return + + export_data( + data=formatted_issues, + fmt=fmt.lower(), + output=output, + title="OSV Patches Found" + ) + summary(f"Found patched versions for {len(formatted_issues)} vulnerabilities.") \ No newline at end of file