Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ conviso --help
- Vulnerabilities with local field filter (auto deep for deep fields): `python -m conviso.app vulns list --company-id 443 --all --contains codeSnippet=eval( --contains fileName=app.py`
- Vulnerabilities (DAST/WEB) search by request/response: `python -m conviso.app vulns list --company-id 443 --types DAST_FINDING,WEB_VULNERABILITY --all --contains request=Authorization --contains response=stacktrace`
- Vulnerabilities with forced deep local search: `python -m conviso.app vulns list --company-id 443 --all --contains codeSnippet=eval( --deep-search --workers 8`
- Vulnerabilities (SCA) checking patches against OSV: `python -m conviso.app vulns check-sca-patches --company-id 443 --severities HIGH,CRITICAL --status RISK_ACCEPTED --all`
- Vulnerability timeline (by vulnerability ID): `python -m conviso.app vulns timeline --id 12345`
- Vulnerabilities timeline by project: `python -m conviso.app vulns timeline --company-id 443 --project-id 26102`
- Last user who changed vuln status: `python -m conviso.app vulns timeline --id 12345 --last-status-change-only`
Expand Down
2 changes: 1 addition & 1 deletion src/conviso/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.3.7
0.3.8
290 changes: 290 additions & 0 deletions src/conviso/commands/vulnerabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import json
import re
from datetime import date, datetime, timedelta, timezone
from collections import defaultdict
import itertools
import requests
import time
from conviso.core.notifier import info, error, summary, success, warning, timed_summary
Expand Down Expand Up @@ -2100,3 +2102,291 @@ def clean_source_payload(base):
except Exception as exc:
error(f"Error updating vulnerability: {exc}")
raise typer.Exit(code=1)



# ---------------------- CHECK SCA PATCHES ---------------------- #
@app.command("check-sca-patches", help="Check OSV for available patches for SCA vulnerabilities and optionally update them.")
def check_sca_patches(
company_id: int = typer.Option(..., "--company-id", "-c", help="Company ID."),
asset_ids: Optional[str] = typer.Option(None, "--asset-ids", "-a", help="Comma-separated asset IDs to filter."),
severities: Optional[str] = typer.Option(None, "--severities", "-s", help="Comma-separated severities (NOTIFICATION,LOW,MEDIUM,HIGH,CRITICAL)."),
status: Optional[str] = typer.Option(None,"--status",help="Comma-separated vulnerability status labels."),
asset_tags: Optional[str] = typer.Option(None, "--asset-tags", "-t", help="Comma-separated asset tags."),
cves: Optional[str] = typer.Option(None, "--cves", help="Comma-separated CVE identifiers."),
page: int = typer.Option(1, "--page", "-p", help="Page number."),
per_page: int = typer.Option(50, "--per-page", "-l", help="Items per page."),
all_pages: bool = typer.Option(False, "--all", help="Fetch all pages."),
fmt: str = typer.Option("table", "--format", "-f", help="Output format: table, json, csv."),
output: Optional[str] = typer.Option(None, "--output", "-o", help="Output file for json/csv."),
):
"""Check OSV for available patches for open SCA vulnerabilities without a patched version."""
info(f"Checking SCA patches for company {company_id}...")

def _split_ids(value: Optional[str]):
if not value: return None
ids = []
for raw in value.split(","):
raw = raw.strip()
if not raw: continue
try: ids.append(int(raw))
except ValueError: continue
return ids or None

def _split_strs(value: Optional[str]):
if not value: return None
vals = [v.strip() for v in value.split(",") if v.strip()]
return vals or None

query_sca = """
query IssuesSca($companyId: ID!, $pagination: PaginationInput!, $filters: IssuesFiltersInput) {
issues(companyId: $companyId, pagination: $pagination, filters: $filters) {
collection {
id
title
status
type
asset {
id
name
assetsTagList
}
... on ScaFinding {
severity
detail {
package
affectedVersion
patchedVersion
cve
}
}
}
metadata {
currentPage
totalPages
}
}
}
"""

SEVERITY_ALLOWED = {"NOTIFICATION", "LOW", "MEDIUM", "HIGH", "CRITICAL"}
STATUS_ALLOWED = {"CREATED", "DRAFT", "IDENTIFIED", "IN_PROGRESS", "AWAITING_VALIDATION", "FIX_ACCEPTED", "RISK_ACCEPTED", "FALSE_POSITIVE", "SUPPRESSED"}

assets_list = _split_ids(asset_ids)

severities_list = None
if severities:
severities_list = [s.strip().upper() for s in severities.split(",") if s.strip()]
for s in severities_list:
if s not in SEVERITY_ALLOWED:
error(f"Invalid severity '{s}'. Allowed: {', '.join(SEVERITY_ALLOWED)}")
raise typer.Exit(code=1)

status_list = None
if status:
status_list = [s.strip().upper() for s in status.split(",") if s.strip()]
for s in status_list:
if s not in STATUS_ALLOWED:
error(f"Invalid status '{s}'. Allowed: {', '.join(STATUS_ALLOWED)}")
raise typer.Exit(code=1)

asset_tags_list = _split_strs(asset_tags)
cves_list = _split_strs(cves)

filters = {
"failureTypes": ["SCA_FINDING"],
"statuses": status_list or ["CREATED", "IDENTIFIED", "IN_PROGRESS", "AWAITING_VALIDATION"]
}
if assets_list: filters["assetIds"] = assets_list
if severities_list: filters["severities"] = severities_list
if asset_tags_list: filters["assetTags"] = asset_tags_list
if cves_list: filters["cves"] = cves_list

fetch_all = all_pages
current_page = page

vars_base = {
"companyId": str(company_id),
"pagination": {"page": current_page, "perPage": per_page if not fetch_all else 200},
"filters": filters
}

grouped_issues = defaultdict(list)
total_issues_count = 0

def _fetch_page(page_num: int):
vars_page = dict(vars_base)
vars_page["pagination"]["page"] = page_num
res = graphql_request(query_sca, vars_page, log_request=True, verbose_only=True)
return page_num, res

def _process_collection(collection):
nonlocal total_issues_count
for vuln in collection:
if vuln.get("type") != "SCA_FINDING":
continue

detail = vuln.get("detail") or {}
patched_ver = detail.get("patchedVersion")
cve = detail.get("cve")
package = detail.get("package")

if not patched_ver and (cve or package):
asset = vuln.get("asset") or {}
tags = ", ".join(asset.get("assetsTagList") or [])
severity_value = vuln.get("severity") or ""

sev_color_map = {
"CRITICAL": "bold white on red",
"HIGH": "bold red",
"MEDIUM": "yellow",
"LOW": "green",
"NOTIFICATION": "cyan"
}
sev_display = severity_value
sev_style = sev_color_map.get(severity_value.upper(), None)
if sev_style:
sev_display = f"[{sev_style}]{severity_value}[/{sev_style}]"

current_version = detail.get("affectedVersion")

issue_data = {
"Vuln ID": str(vuln.get("id")),
"Asset ID": str(asset.get("id") or "-"),
"Asset Name": asset.get("name") or "-",
"Asset Tags": tags or "-",
"Package": package or "-",
"Status": vuln.get("status") or "-",
"Severity": sev_display,
"CVE": cve or "-",
"Current Version": current_version or "-",
}

query_key = (cve, package, current_version)
grouped_issues[query_key].append(issue_data)
total_issues_count += 1

try:
page_num, res = _fetch_page(current_page)
issues_data = res.get("issues") or {}
_process_collection(issues_data.get("collection") or [])

total_p = (issues_data.get("metadata") or {}).get("totalPages") or 1

if fetch_all and total_p > current_page:
page_numbers = list(range(current_page + 1, total_p + 1))
page_results = parallel_map(_fetch_page, page_numbers)

for _, p_res in sorted(page_results, key=lambda x: x[0]):
p_coll = (p_res.get("issues") or {}).get("collection") or []
_process_collection(p_coll)

except Exception as e:
error(f"Error fetching vulnerabilities: {e}")
raise typer.Exit(code=1)

if total_issues_count == 0:
success("No open SCA vulnerabilities missing a patched version found.")
return

info(f"Found {total_issues_count} SCA vulnerabilities missing patchedVersion. Querying OSV in parallel...")

def _extract_fixes(affected):
ranges = affected.get("ranges", [])

yield from (
("DB_SPEC", v["fixed"])
for r in ranges
for v in (r.get("database_specific") or {}).get("versions", [])
if "fixed" in v
)
yield from (
(r.get("type"), e["fixed"])
for r in ranges
for e in r.get("events", [])
if "fixed" in e
)
yield from (
("DB_SPEC", v["fixed"])
for v in (affected.get("database_specific") or {}).get("versions", [])
if "fixed" in v
)

def _get_best_fixed_version(affected_list, pkg_match=None):
def _matches(a):
name = (a.get("package") or {}).get("name")
return not pkg_match or not name or name == pkg_match

all_fixes = list(itertools.chain.from_iterable(
_extract_fixes(a) for a in affected_list if _matches(a)
))

eco_fixes = {v for t, v in all_fixes if t in ("ECOSYSTEM", "SEMVER")}
db_spec_fixes = {v for t, v in all_fixes if t == "DB_SPEC"}
git_fixes = [v for t, v in all_fixes if t == "GIT"]

if eco_fixes: return ", ".join(eco_fixes)
if db_spec_fixes: return ", ".join(db_spec_fixes)
return git_fixes[-1] if git_fixes else None

http_session = requests.Session()

def _fetch_osv_patch(query_key):
cve, package, current_version = query_key
found_patch = None

try:
if cve:
resp = http_session.get(f"https://api.osv.dev/v1/vulns/{cve}", timeout=10)
if resp.status_code == 200:
data = resp.json()
found_patch = _get_best_fixed_version(data.get("affected", []))
if not found_patch:
found_patch = next(
(
patch
for alias in data.get("aliases", [])
for alias_resp in [http_session.get(f"https://api.osv.dev/v1/vulns/{alias}", timeout=10)]
if alias_resp.status_code == 200
for patch in [_get_best_fixed_version(alias_resp.json().get("affected", []))]
if patch
),
None,
)
elif package and current_version:
payload = {"version": current_version, "package": {"name": package}}
resp = http_session.post("https://api.osv.dev/v1/query", json=payload, timeout=10)
if resp.status_code == 200:
found_patch = next(
(
patch
for vuln in resp.json().get("vulns", [])
for patch in [_get_best_fixed_version(vuln.get("affected", []), pkg_match=package)]
if patch
),
None,
)
except Exception as e:
warning(f"Error querying OSV for {cve or package}: {e}")

return query_key, found_patch

raw_results = parallel_map(_fetch_osv_patch, grouped_issues.keys())

formatted_issues = [
{**issue_data, "OSV Patched Version": patch}
for query_key, patch in raw_results if patch
for issue_data in grouped_issues[query_key]
]

if not formatted_issues:
warning("No patched versions found via OSV.")
return

export_data(
data=formatted_issues,
fmt=fmt.lower(),
output=output,
title="OSV Patches Found"
)
summary(f"Found patched versions for {len(formatted_issues)} vulnerabilities.")
Loading