diff --git a/README.md b/README.md index f1c4ead..bb725a6 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,10 @@ conviso --help - Vulnerabilities: `python -m conviso.app vulns list --company-id 443 --severities HIGH,CRITICAL --asset-tags cloud --all` - Vulnerabilities (last 7 days): `python -m conviso.app vulns list --company-id 443 --days-back 7 --severities HIGH,CRITICAL --all` - Vulnerabilities by author: `python -m conviso.app vulns list --company-id 443 --author "Fernando" --all` +- Vulnerabilities with local free-text search: `python -m conviso.app vulns list --company-id 443 --all --grep "jwt"` +- Vulnerabilities with local field filter (auto deep for deep fields): `python -m conviso.app vulns list --company-id 443 --all --contains codeSnippet=eval( --contains fileName=app.py` +- Vulnerabilities (DAST/WEB) search by request/response: `python -m conviso.app vulns list --company-id 443 --types DAST_FINDING,WEB_VULNERABILITY --all --contains request=Authorization --contains response=stacktrace` +- Vulnerabilities with forced deep local search: `python -m conviso.app vulns list --company-id 443 --all --contains codeSnippet=eval( --deep-search --workers 8` - Vulnerability timeline (by vulnerability ID): `python -m conviso.app vulns timeline --id 12345` - Vulnerabilities timeline by project: `python -m conviso.app vulns timeline --company-id 443 --project-id 26102` - Last user who changed vuln status: `python -m conviso.app vulns timeline --id 12345 --last-status-change-only` @@ -105,12 +109,21 @@ conviso --help - Last user who changed vuln status to ANALYSIS: `python -m conviso.app vulns timeline --id 12345 --status ANALYSIS --last-status-change-only` Output options: `--format table|json|csv`, `--output path` to save JSON/CSV. +Global performance option: `--workers ` sets default parallel workers for all commands (e.g. `python -m conviso.app --workers 16 vulns list ...`). +Global output options: +- `--repeat-header ` repeats table headers every N rows (helps when scrolling long outputs). +- `--columns ` selects columns for table/csv output across commands (unknown columns are ignored). Notes: - `projects list --filter` supports `assignee=` to filter by allocated analyst. - GraphQL errors return exit code 1. - Use `--all` on list commands to fetch every page. - `--quiet` silences info logs; `--verbose` shows per-page requests when paginating. +- `--workers` controls default parallel workers across commands; command-level `--workers` (when available) overrides it. +- `--repeat-header` and `--columns` are global and apply to all commands using table/csv output. +- In `vulns list`, `--contains` for deep fields (`codeSnippet`, `fileName`, `vulnerableLine`, `request`, `response`, `url`, `method`, `parameters`) auto-enables deep search. +- `--deep-search` still exists as a manual override; `--resolve-snippet-urls` applies when deep search is active (manual or auto). +- When `--grep` or `--contains` is used, the list includes `Matched In` to indicate which fields triggered the match. - On startup the CLI checks for a newer version (via https://raw.githubusercontent.com/convisolabs/conviso-cli/main/VERSION). Set `CONVISO_CLI_SKIP_UPDATE_CHECK=1` to skip. - When offline, the check warns and you can force the comparison by setting `CONVISO_CLI_REMOTE_VERSION` (manual override). - Upgrade: `python -m conviso.app upgrade` (equiv. `conviso upgrade`) runs `git pull --ff-only` in the repo directory; if installed via pip, run `pip install .` after the pull. diff --git a/src/conviso/VERSION b/src/conviso/VERSION index 1c09c74..42045ac 100644 --- a/src/conviso/VERSION +++ b/src/conviso/VERSION @@ -1 +1 @@ -0.3.3 +0.3.4 diff --git a/src/conviso/app.py b/src/conviso/app.py index 8e5bcf8..ac9a8b6 100644 --- a/src/conviso/app.py +++ b/src/conviso/app.py @@ -7,10 +7,13 @@ from conviso.commands import sbom from conviso.commands import tasks from conviso.core.logger import log, set_verbosity +from conviso.core.concurrency import set_default_workers +from conviso.core.output_prefs import set_output_preferences from conviso.core.notifier import info, warning from conviso.core.version import check_for_updates, DEFAULT_REMOTE_URL, read_local_version import subprocess import os +from typing import Optional app = typer.Typer(help="Conviso Platform CLI") @@ -27,8 +30,21 @@ def main( ctx: typer.Context, quiet: bool = typer.Option(False, "--quiet", help="Silence non-error output."), verbose: bool = typer.Option(False, "--verbose", help="Show verbose logs (GraphQL requests, etc.)."), + workers: int = typer.Option(8, "--workers", help="Default worker threads for parallel operations across commands."), + repeat_header_every: int = typer.Option( + 0, + "--repeat-header", + help="Repeat table headers every N rows (global output option). 0 disables.", + ), + columns: Optional[str] = typer.Option( + None, + "--columns", + help="Comma-separated columns for table/csv output (global output option). Example: --columns id,title,status", + ), ): set_verbosity(quiet=quiet, verbose=verbose) + set_default_workers(workers) + set_output_preferences(repeat_header_every=repeat_header_every, columns=columns) if ctx.resilient_parsing: return diff --git a/src/conviso/clients/client_graphql.py b/src/conviso/clients/client_graphql.py index ebbbc45..3920901 100644 --- a/src/conviso/clients/client_graphql.py +++ b/src/conviso/clients/client_graphql.py @@ -17,6 +17,17 @@ API_KEY = os.getenv("CONVISO_API_KEY") DEFAULT_TIMEOUT = float(os.getenv("CONVISO_API_TIMEOUT", "30")) DEFAULT_RETRIES = int(os.getenv("CONVISO_API_RETRIES", "2")) +POOL_CONNECTIONS = int(os.getenv("CONVISO_API_POOL_CONNECTIONS", "32")) +POOL_MAXSIZE = int(os.getenv("CONVISO_API_POOL_MAXSIZE", "64")) + +SESSION = requests.Session() +ADAPTER = requests.adapters.HTTPAdapter( + pool_connections=POOL_CONNECTIONS, + pool_maxsize=POOL_MAXSIZE, + max_retries=0, +) +SESSION.mount("https://", ADAPTER) +SESSION.mount("http://", ADAPTER) def graphql_request(query: str, variables: dict = None, log_request: bool = True, verbose_only: bool = False) -> dict: @@ -39,7 +50,7 @@ def graphql_request(query: str, variables: dict = None, log_request: bool = True last_exc = None for attempt in range(DEFAULT_RETRIES + 1): try: - response = requests.post(API_URL, json=payload, headers=headers, timeout=DEFAULT_TIMEOUT) + response = SESSION.post(API_URL, json=payload, headers=headers, timeout=DEFAULT_TIMEOUT) response.raise_for_status() data = response.json() if "errors" in data: @@ -92,7 +103,7 @@ def graphql_request_upload( with open(file_path, "rb") as f: files = {"0": f} - response = requests.post( + response = SESSION.post( API_URL, data={"operations": json.dumps(operations), "map": json.dumps(map_part)}, files=files, diff --git a/src/conviso/commands/assets.py b/src/conviso/commands/assets.py index b81fa8d..90ef23f 100644 --- a/src/conviso/commands/assets.py +++ b/src/conviso/commands/assets.py @@ -7,8 +7,10 @@ """ import typer +import time from typing import Optional -from conviso.core.notifier import info, success, error, summary, warning +from conviso.core.notifier import info, success, error, summary, warning, timed_summary +from conviso.core.validators import validate_choice, validate_csv_choices from conviso.clients.client_graphql import graphql_request from conviso.schemas.assets_schema import schema from conviso.core.output_manager import export_data @@ -34,6 +36,8 @@ def list_assets( ): """List all assets for a specific company.""" info(f"Listing assets for company {company_id} (page {page}, limit {limit})...") + started_at = time.perf_counter() + fmt_lower = fmt.lower() query = """ query Assets($companyId: ID!, $limit: Int, $page: Int, $search: AssetsSearch) { @@ -62,6 +66,8 @@ def list_assets( BUSINESS_IMPACT_ALLOWED = {"LOW", "MEDIUM", "HIGH", "NOT_DEFINED"} ATTACK_SURFACE_ALLOWED = {"INTERNET_FACING", "INTERNAL", "NOT_DEFINED"} + THREAT_ALLOWED = {"CRITICAL", "HIGH", "MEDIUM", "LOW", "NOTIFICATION"} + DATA_CLASS_ALLOWED = {"PII", "PAYMENT_CARD_INDUSTRY", "NON_SENSITIVE", "NOT_DEFINED"} def _split_list(value: Optional[str], upper: bool = False, allowed: Optional[set] = None, label: str = ""): if not value: @@ -78,12 +84,21 @@ def _split_list(value: Optional[str], upper: bool = False, allowed: Optional[set items.append(v) return items or None + try: + validated_business = validate_csv_choices(business_impact, BUSINESS_IMPACT_ALLOWED, "--business-impact") + validated_data_class = validate_csv_choices(data_classification, DATA_CLASS_ALLOWED, "--data-classification") + validated_attack_surface = validate_csv_choices(attack_surface, ATTACK_SURFACE_ALLOWED, "--attack-surface") + validated_threat = validate_csv_choices(threat, THREAT_ALLOWED, "--threat") + except ValueError as exc: + error(str(exc)) + raise typer.Exit(code=1) + search_filters = { "tags": _split_list(tags), - "businessImpact": _split_list(business_impact, upper=True, allowed=BUSINESS_IMPACT_ALLOWED, label="business impact"), - "dataClassification": _split_list(data_classification), - "exploitability": _split_list(attack_surface, upper=True, allowed=ATTACK_SURFACE_ALLOWED, label="attack surface"), - "threat": _split_list(threat, upper=True), + "businessImpact": validated_business, + "dataClassification": validated_data_class, + "exploitability": validated_attack_surface, + "threat": validated_threat, } if env_compromised: search_filters["environmentCompromised"] = True @@ -137,7 +152,9 @@ def _split_list(value: Optional[str], upper: bool = False, allowed: Optional[set "LOW": "green", "NOT_DEFINED": "dim", }.get(str(impact).upper()) - impact_display = f"[{impact_color}]{impact}[/{impact_color}]" if impact_color else impact + impact_display = impact + if fmt_lower == "table" and impact_color: + impact_display = f"[{impact_color}]{impact}[/{impact_color}]" rows.append({ "id": a.get("id") or "", @@ -165,7 +182,8 @@ def _split_list(value: Optional[str], upper: bool = False, allowed: Optional[set title=f"Assets (Company {company_id}) - Page {page}/{total_pages or '?'}", ) - summary(f"{len(rows)} asset(s) listed out of {total_count or len(rows)} total.\n") + elapsed = time.perf_counter() - started_at + timed_summary(f"{len(rows)} asset(s) listed out of {total_count or len(rows)} total", elapsed) except Exception as e: error(f"Error listing assets: {e}") @@ -187,33 +205,19 @@ def create_asset( info(f"Creating new asset '{name}' for company {company_id}...") BUSINESS_IMPACT_ALLOWED = {"LOW", "MEDIUM", "HIGH", "NOT_DEFINED"} - DATA_CLASS_ALLOWED = {"PERSONALLY_IDENTIFIABLE_INFORMATION", "PAYMENT_CARD_INDUSTRY", "NON_SENSITIVE", "NOT_DEFINED"} - - def _parse_business(value: Optional[str]): - if not value: - return None - up = value.strip().upper() - if up not in BUSINESS_IMPACT_ALLOWED: - warning(f"Ignoring invalid business impact: {value}") - return None - return up - - def _parse_data_class(value: Optional[str]): - if not value: - return None - vals = [] - for raw in value.split(","): - v = raw.strip().upper() - if not v: - continue - if v not in DATA_CLASS_ALLOWED: - warning(f"Ignoring invalid data classification: {raw}") - continue - vals.append(v) - return vals or None + DATA_CLASS_ALLOWED = {"PII", "PERSONALLY_IDENTIFIABLE_INFORMATION", "PAYMENT_CARD_INDUSTRY", "NON_SENSITIVE", "NOT_DEFINED"} - parsed_business = _parse_business(business_impact) - parsed_data_class = _parse_data_class(data_classification) + try: + parsed_business = validate_choice(business_impact, BUSINESS_IMPACT_ALLOWED, "--business-impact") + parsed_data_class = validate_csv_choices(data_classification, DATA_CLASS_ALLOWED, "--data-classification") + except ValueError as exc: + error(str(exc)) + raise typer.Exit(code=1) + if parsed_data_class: + parsed_data_class = [ + "PERSONALLY_IDENTIFIABLE_INFORMATION" if v == "PII" else v + for v in parsed_data_class + ] mutation = """ mutation CreateAsset($input: CreateAssetInput!) { @@ -266,33 +270,19 @@ def update_asset( info(f"Updating asset ID {asset_id} in company {company_id}...") BUSINESS_IMPACT_ALLOWED = {"LOW", "MEDIUM", "HIGH", "NOT_DEFINED"} - DATA_CLASS_ALLOWED = {"PERSONALLY_IDENTIFIABLE_INFORMATION", "PAYMENT_CARD_INDUSTRY", "NON_SENSITIVE", "NOT_DEFINED"} + DATA_CLASS_ALLOWED = {"PII", "PERSONALLY_IDENTIFIABLE_INFORMATION", "PAYMENT_CARD_INDUSTRY", "NON_SENSITIVE", "NOT_DEFINED"} - def _parse_business(value: Optional[str]): - if value is None: - return None - up = value.strip().upper() - if up not in BUSINESS_IMPACT_ALLOWED: - warning(f"Ignoring invalid business impact: {value}") - return None - return up - - def _parse_data_class(value: Optional[str]): - if value is None: - return None - vals = [] - for raw in value.split(","): - v = raw.strip().upper() - if not v: - continue - if v not in DATA_CLASS_ALLOWED: - warning(f"Ignoring invalid data classification: {raw}") - continue - vals.append(v) - return vals or None - - parsed_business = _parse_business(business_impact) - parsed_data_class = _parse_data_class(data_classification) + try: + parsed_business = validate_choice(business_impact, BUSINESS_IMPACT_ALLOWED, "--business-impact") + parsed_data_class = validate_csv_choices(data_classification, DATA_CLASS_ALLOWED, "--data-classification") + except ValueError as exc: + error(str(exc)) + raise typer.Exit(code=1) + if parsed_data_class: + parsed_data_class = [ + "PERSONALLY_IDENTIFIABLE_INFORMATION" if v == "PII" else v + for v in parsed_data_class + ] mutation = """ mutation UpdateAsset($input: UpdateAssetInput!) { diff --git a/src/conviso/commands/bulk.py b/src/conviso/commands/bulk.py index 6254a82..80f5e1e 100644 --- a/src/conviso/commands/bulk.py +++ b/src/conviso/commands/bulk.py @@ -10,6 +10,7 @@ from rich.table import Table from conviso.core.notifier import info, success, error, warning from conviso.core.bulk_loader import load_csv, bulk_process, SkipRow, BulkResult +from conviso.core.concurrency import parallel_map from conviso.clients.client_graphql import graphql_request from conviso.core.logger import VERBOSE from conviso.core.output_manager import console @@ -575,11 +576,12 @@ def _create_asset(name: str) -> Optional[int]: missing_assets = [nm for nm in asset_names if not _resolve_asset_by_name(nm)] info(f"Assets referenced in SARIF: {len(asset_names)}. Resolved by name: {len(resolved_assets)}. Missing: {len(missing_assets)}.") - # Auto-create missing assets by name - for nm in missing_assets: - created = _create_asset(nm) - if created: - resolved_assets[nm] = created + # Auto-create missing assets by name (parallel fan-out) + if missing_assets: + created_ids = parallel_map(_create_asset, missing_assets) + for nm, created in zip(missing_assets, created_ids): + if created: + resolved_assets[nm] = created # Recompute missing after creation attempts missing_assets = [nm for nm in asset_names if not _resolve_asset_by_name(nm)] diff --git a/src/conviso/commands/projects.py b/src/conviso/commands/projects.py index c7ada88..2df1307 100644 --- a/src/conviso/commands/projects.py +++ b/src/conviso/commands/projects.py @@ -7,11 +7,14 @@ """ import math +import time import typer from typing import Optional, List from datetime import datetime, timezone -from conviso.core.notifier import info, success, error, summary, warning +from conviso.core.notifier import info, success, error, summary, warning, timed_summary from conviso.clients.client_graphql import graphql_request +from conviso.core.concurrency import parallel_map +from conviso.core.validators import validate_choice from conviso.schemas.projects_schema import schema from conviso.schemas.project_requirements_activities_schema import schema as project_requirements_schema from conviso.core.output_manager import export_data @@ -43,6 +46,7 @@ def list_projects( ): """List projects for a given company using the unified output manager.""" info(f"Listing projects for company {company_id} (page {page}, limit {limit})...") + started_at = time.perf_counter() # Build search parameters params = {"scopeIdEq": company_id} @@ -113,21 +117,8 @@ def list_projects( rows = [] total_pages = None total_count = 0 - while True: - variables["page"] = current_page - data = graphql_request(query, variables, log_request=True, verbose_only=all_pages) - projects_data = data["projects"] - collection = projects_data["collection"] - metadata = projects_data["metadata"] - total_pages = metadata.get("totalPages") - total_count = metadata.get("totalCount", total_count) - - if not collection: - if current_page == page: - typer.echo("⚠️ No projects found.") - raise typer.Exit() - break - + def _append_rows_from_collection(collection): + nonlocal rows for p in collection: assignees = [] for alloc in p.get("allocatedAnalyst") or []: @@ -173,9 +164,43 @@ def list_projects( "tags": tags_str, }) - if not fetch_all or (total_pages is not None and current_page >= total_pages): - break - current_page += 1 + def _fetch_page(page_num: int): + vars_page = dict(variables) + vars_page["page"] = page_num + data_page = graphql_request(query, vars_page, log_request=True, verbose_only=all_pages) + projects_page = data_page["projects"] + collection_page = projects_page.get("collection") or [] + metadata_page = projects_page.get("metadata") or {} + return page_num, collection_page, metadata_page + + page_num, collection, metadata = _fetch_page(current_page) + total_pages = metadata.get("totalPages") + total_count = metadata.get("totalCount", total_count) + + if not collection: + typer.echo("⚠️ No projects found.") + raise typer.Exit() + + _append_rows_from_collection(collection) + + if fetch_all: + if total_pages is not None and current_page < total_pages: + page_numbers = list(range(current_page + 1, total_pages + 1)) + page_results = parallel_map(_fetch_page, page_numbers) + for _, page_collection, _ in sorted(page_results, key=lambda x: x[0]): + if page_collection: + _append_rows_from_collection(page_collection) + else: + # Metadata missing: keep sequential pagination fallback + while True: + current_page += 1 + _, page_collection, page_metadata = _fetch_page(current_page) + if not page_collection: + break + _append_rows_from_collection(page_collection) + total_pages = page_metadata.get("totalPages") + if total_pages is not None and current_page >= total_pages: + break if not rows: typer.echo("⚠️ No projects found.") @@ -198,9 +223,11 @@ def list_projects( total_pages_calc = math.ceil(total / effective_limit) - summary( + elapsed = time.perf_counter() - started_at + timed_summary( f"Showing {start}-{end} of {total} " - f"(page {page}/{total_pages_calc}).\n" + f"(page {page}/{total_pages_calc})", + elapsed, ) except typer.Exit: @@ -297,9 +324,11 @@ def _safe_parse_iso(value: Optional[str]) -> Optional[datetime]: except Exception: return None - status_filter = status.strip().upper() if status else None - if status_filter and status_filter not in allowed_status: - warning(f"Unknown status '{status_filter}'. Allowed: {', '.join(sorted(allowed_status))}") + try: + status_filter = validate_choice(status, allowed_status, "--status") + except ValueError as exc: + error(str(exc)) + raise typer.Exit(code=1) history_email_filter = (history_email or "").strip().lower() or None attachment_name_filter = (attachment_name or "").strip().lower() or None diff --git a/src/conviso/commands/requirements.py b/src/conviso/commands/requirements.py index 5145e1b..2081b13 100644 --- a/src/conviso/commands/requirements.py +++ b/src/conviso/commands/requirements.py @@ -6,8 +6,9 @@ """ import typer +import time from typing import Optional -from conviso.core.notifier import info, error, success, summary, warning +from conviso.core.notifier import info, error, success, summary, warning, timed_summary from conviso.clients.client_graphql import graphql_request from conviso.core.output_manager import export_data from conviso.schemas.requirements_schema import schema @@ -27,6 +28,7 @@ def list_requirements( ): """List requirements for a scope.""" info(f"Listing requirements for company {company_id} (page {page}, per_page {per_page})...") + started_at = time.perf_counter() query = """ query Requirements($scopeId: Int!, $pagination: BasePaginationInput!, $filters: RequirementsFilterInput) { @@ -81,7 +83,8 @@ def list_requirements( output=output, title=f"Requirements (Company {company_id}) - Page {page}/{metadata.get('totalPages')}", ) - summary(f"{len(collection)} requirement(s) listed out of {metadata.get('totalCount')}.") + elapsed = time.perf_counter() - started_at + timed_summary(f"{len(collection)} requirement(s) listed out of {metadata.get('totalCount')}", elapsed) except Exception as e: error(f"Error listing requirements: {e}") @@ -101,6 +104,7 @@ def list_project_requirements( ): """List requirements associated with a project.""" info(f"Listing requirements for project {project_id} in company {company_id}...") + started_at = time.perf_counter() query_with_activities = """ query ProjectRequirements($id: ID!) { @@ -196,9 +200,11 @@ def list_project_requirements( title=f"Requirements (Project {project_id}) - {project.get('label') or ''}".strip(), ) if with_activities: - summary(f"{len(rows)} activit(ies) listed for project {project_id}.") + elapsed = time.perf_counter() - started_at + timed_summary(f"{len(rows)} activit(ies) listed for project {project_id}", elapsed) else: - summary(f"{len(collection)} requirement(s) listed for project {project_id}.") + elapsed = time.perf_counter() - started_at + timed_summary(f"{len(collection)} requirement(s) listed for project {project_id}", elapsed) except Exception as e: error(f"Error listing project requirements: {e}") @@ -225,6 +231,7 @@ def list_requirement_activities( info(f"Listing activities for requirement {requirement_id} in company {company_id}...") else: info(f"Listing activities for requirements in project {project_id} (company {company_id})...") + started_at = time.perf_counter() requirement_query = """ query Requirement($companyId: ID!, $id: ID!) { @@ -287,7 +294,8 @@ def list_requirement_activities( output=output, title=f"Activities (Requirement {requirement_id}) - {req.get('label') or ''}".strip(), ) - summary(f"{len(collection)} activit(ies) listed for requirement {requirement_id}.") + elapsed = time.perf_counter() - started_at + timed_summary(f"{len(collection)} activit(ies) listed for requirement {requirement_id}", elapsed) else: data = graphql_request(project_query, {"id": project_id}) project = data.get("project") or {} @@ -315,7 +323,8 @@ def list_requirement_activities( output=output, title=f"Activities (Project {project_id}) - {project.get('label') or ''}".strip(), ) - summary(f"{len(rows)} activit(ies) listed for project {project_id}.") + elapsed = time.perf_counter() - started_at + timed_summary(f"{len(rows)} activit(ies) listed for project {project_id}", elapsed) except Exception as e: error(f"Error listing requirement activities: {e}") diff --git a/src/conviso/commands/sbom.py b/src/conviso/commands/sbom.py index 27d3734..059ff7f 100644 --- a/src/conviso/commands/sbom.py +++ b/src/conviso/commands/sbom.py @@ -6,8 +6,10 @@ """ import typer +import time from typing import Optional -from conviso.core.notifier import info, error, summary, success +from conviso.core.notifier import info, error, summary, success, timed_summary +from conviso.core.validators import validate_choice from conviso.clients.client_graphql import graphql_request, graphql_request_upload from conviso.core.output_manager import export_data from conviso.schemas.sbom_schema import schema as sbom_schema @@ -36,6 +38,7 @@ def list_sbom( """ List SBOM components for a company. """ + started_at = time.perf_counter() query = """ query SbomComponents($companyId: ID!, $search: SbomComponentSearchInput, $page: Int, $limit: Int) { sbomComponents(companyId: $companyId, search: $search, page: $page, limit: $limit) { @@ -69,7 +72,11 @@ def list_sbom( if sort_by: search["sortBy"] = sort_by if order: - search["order"] = order + try: + search["order"] = validate_choice(order, {"ASC", "DESC"}, "--order") + except ValueError as exc: + error(str(exc)) + raise typer.Exit(code=1) variables = { "companyId": str(company_id), @@ -201,7 +208,8 @@ def _format_issues_by_severity(issues_val): summary(f"CycloneDX exported to {output}") else: print(payload) - summary(f"{len(rows)} component(s) listed out of {total_count or len(rows)}.") + elapsed = time.perf_counter() - started_at + timed_summary(f"{len(rows)} component(s) listed out of {total_count or len(rows)}", elapsed) else: export_data( rows, @@ -210,7 +218,8 @@ def _format_issues_by_severity(issues_val): output=output, title=f"SBOM Components (Company {company_id}) - Page {page}/{total_pages or '?'}", ) - summary(f"{len(rows)} component(s) listed out of {total_count or len(rows)}.") + elapsed = time.perf_counter() - started_at + timed_summary(f"{len(rows)} component(s) listed out of {total_count or len(rows)}", elapsed) except Exception as exc: error(f"Error listing SBOM components: {exc}") raise typer.Exit(code=1) diff --git a/src/conviso/commands/tasks.py b/src/conviso/commands/tasks.py index b7eaefb..dccf786 100644 --- a/src/conviso/commands/tasks.py +++ b/src/conviso/commands/tasks.py @@ -21,7 +21,8 @@ import typer from conviso.clients.client_graphql import graphql_request -from conviso.core.notifier import error, info, summary, warning, success +from conviso.core.concurrency import parallel_map +from conviso.core.notifier import error, info, summary, warning, success, timed_summary try: import yaml @@ -74,6 +75,7 @@ def _approve_command(cmd: str): @approvals_app.command("list") def list_approvals(): """List locally approved task commands.""" + started_at = time.perf_counter() approvals = _load_approved_commands() if not approvals: info("No approved commands found.") @@ -85,7 +87,8 @@ def list_approvals(): rows.append({"hash": key, "approvedAt": approved_at, "cmd": cmd}) for row in rows: typer.echo(f"{row['hash']} {row['approvedAt']} {row['cmd']}") - summary(f"{len(rows)} approved command(s).") + elapsed = time.perf_counter() - started_at + timed_summary(f"{len(rows)} approved command(s)", elapsed) @approvals_app.command("clear") @@ -508,15 +511,22 @@ def _fetch_assets(company_id: int, tags: Optional[str] = None) -> List[Dict[str, limit = 50 all_assets = [] search = {"tags": [t.strip() for t in tags.split(",") if t.strip()]} if tags else None - while True: - data = graphql_request(query, {"companyId": company_id, "limit": limit, "page": page, "search": search}) - assets = (data.get("assets") or {}).get("collection") or [] - meta = (data.get("assets") or {}).get("metadata") or {} - all_assets.extend(assets) - total_pages = meta.get("totalPages") or 1 - if page >= total_pages: - break - page += 1 + + def _fetch_page(p: int) -> tuple[int, List[Dict[str, Any]], int]: + data = graphql_request(query, {"companyId": company_id, "limit": limit, "page": p, "search": search}) + assets_data = data.get("assets") or {} + collection = assets_data.get("collection") or [] + meta = assets_data.get("metadata") or {} + total_pages = int(meta.get("totalPages") or 1) + return p, collection, total_pages + + first_page, first_collection, total_pages = _fetch_page(page) + all_assets.extend(first_collection) + if first_page < total_pages: + page_numbers = list(range(first_page + 1, total_pages + 1)) + page_results = parallel_map(_fetch_page, page_numbers) + for _, collection, _ in sorted(page_results, key=lambda x: x[0]): + all_assets.extend(collection) return all_assets @@ -1309,6 +1319,7 @@ def list_tasks( ): """List tasks defined as YAML in requirement activities.""" _require_yaml() + started_at = time.perf_counter() rows: List[Dict[str, Any]] = [] @@ -1366,7 +1377,8 @@ def list_tasks( output=output, title=f"Tasks ({'Project ' + str(project_id) if project_id else 'Templates'})", ) - summary(f"{len(rows)} task(s) listed.") + elapsed = time.perf_counter() - started_at + timed_summary(f"{len(rows)} task(s) listed", elapsed) @app.command("run") diff --git a/src/conviso/commands/vulnerabilities.py b/src/conviso/commands/vulnerabilities.py index 4aca2a0..aa85090 100644 --- a/src/conviso/commands/vulnerabilities.py +++ b/src/conviso/commands/vulnerabilities.py @@ -6,11 +6,15 @@ """ import typer -from typing import Optional +from typing import Optional, List, Tuple import json import re from datetime import date, datetime, timedelta, timezone -from conviso.core.notifier import info, error, summary, success, warning +import requests +import time +from conviso.core.notifier import info, error, summary, success, warning, timed_summary +from conviso.core.concurrency import parallel_map, resolve_workers +from conviso.core.validators import validate_choice, validate_csv_choices from conviso.clients.client_graphql import graphql_request from conviso.core.output_manager import export_data from conviso.schemas.vulnerabilities_schema import schema @@ -28,7 +32,7 @@ def list_vulnerabilities( asset_tags: Optional[str] = typer.Option(None, "--asset-tags", "-t", help="Comma-separated asset tags."), project_types: Optional[str] = typer.Option(None, "--project-types", help="Comma-separated project types (e.g. PENETRATION_TEST, WEB_PENETRATION_TESTING)."), cves: Optional[str] = typer.Option(None, "--cves", help="Comma-separated CVE identifiers."), - issue_types: Optional[str] = typer.Option(None, "--types", help="Comma-separated failure types (e.g. WEB_VULNERABILITY, DAST_FINDING, SAST_FINDING, SOURCE_CODE_VULNERABILITY, NETWORK_VULNERABILITY, SCA_FINDING)."), + issue_types: Optional[str] = typer.Option(None, "--types", help="Comma-separated failure types (e.g. WEB_VULNERABILITY, DAST_FINDING, SAST_FINDING, SOURCE_CODE_VULNERABILITY, NETWORK_VULNERABILITY, SCA_FINDING, IAC_FINDING, SECRET_FINDING, CONTAINER_FINDING)."), days_back: Optional[int] = typer.Option(None, "--days-back", help="Filter by created date in the last N days (sets --created-start automatically)."), created_start: Optional[str] = typer.Option(None, "--created-start", help="Created at >= (YYYY-MM-DD)."), created_end: Optional[str] = typer.Option(None, "--created-end", help="Created at <= (YYYY-MM-DD)."), @@ -40,6 +44,27 @@ def list_vulnerabilities( exploitability: Optional[str] = typer.Option(None, "--attack-surface", "-A", help="Attack surface (INTERNET_FACING,INTERNAL,NOT_DEFINED)."), assignee_emails: Optional[str] = typer.Option(None, "--assignees", help="Comma-separated assignee emails."), author: Optional[str] = typer.Option(None, "--author", help="Filter by author name (contains, case-insensitive)."), + grep: Optional[str] = typer.Option( + None, + "--grep", + help="Local free-text search across all returned fields (case-insensitive), e.g. --grep jwt.", + ), + contains: Optional[List[str]] = typer.Option( + None, + "--contains", + help="Local field-specific filter in 'field=value' format (case-insensitive). Repeatable and combined with AND, e.g. --contains status=ANALYSIS --contains author=fernando. Deep fields (codeSnippet,fileName,vulnerableLine,request,response,url,method,parameters) auto-enable deep search.", + ), + deep_search: bool = typer.Option( + False, + "--deep-search", + help="Force deep field resolution for local search (auto-enabled for deep --contains fields like codeSnippet/fileName/vulnerableLine/request/response/url/method/parameters).", + ), + workers: Optional[int] = typer.Option(None, "--workers", help="Override worker threads for this command (defaults to global --workers)."), + resolve_snippet_urls: bool = typer.Option( + True, + "--resolve-snippet-urls/--no-resolve-snippet-urls", + help="With deep search (manual or auto), fetch snippet file content when codeSnippet is an URL (common in IAC).", + ), page: int = typer.Option(1, "--page", "-p", help="Page number."), per_page: int = typer.Option(50, "--per-page", "-l", help="Items per page."), all_pages: bool = typer.Option(False, "--all", help="Fetch all pages."), @@ -48,9 +73,25 @@ def list_vulnerabilities( ): """List vulnerabilities (issues) for a company, optionally filtered by asset IDs.""" info(f"Listing vulnerabilities for company {company_id} (page {page}, per_page {per_page})...") + started_at = time.perf_counter() + fmt_lower = fmt.lower() SEVERITY_ALLOWED = {"NOTIFICATION", "LOW", "MEDIUM", "HIGH", "CRITICAL"} ATTACK_SURFACE_ALLOWED = {"INTERNET_FACING", "INTERNAL", "NOT_DEFINED"} + DATA_CLASS_ALLOWED = {"PII", "PAYMENT_CARD_INDUSTRY", "NON_SENSITIVE", "NOT_DEFINED"} + BUSINESS_IMPACT_ALLOWED = {"LOW", "MEDIUM", "HIGH", "NOT_DEFINED"} + FAILURE_TYPES_ALLOWED = { + "DAST_FINDING", + "SAST_FINDING", + "MAST_FINDING", + "IAC_FINDING", + "SCA_FINDING", + "CONTAINER_FINDING", + "SECRET_FINDING", + "NETWORK_VULNERABILITY", + "SOURCE_CODE_VULNERABILITY", + "WEB_VULNERABILITY", + } query = """ query Issues($companyId: ID!, $pagination: PaginationInput!, $filters: IssuesFiltersInput) { @@ -80,6 +121,94 @@ def list_vulnerabilities( solution reference impactLevel + detail { package affectedVersion patchedVersion cve fileName } + } + ... on DastFinding { + severity + solution + reference + impactLevel + } + ... on NetworkVulnerability { + severity + solution + reference + impactLevel + } + ... on SourceCodeVulnerability { + severity + solution + reference + impactLevel + detail { vulnerableLine fileName codeSnippet } + } + ... on IacFinding { + severity + solution + reference + impactLevel + detail { vulnerableLine fileName codeSnippet } + } + ... on SecretFinding { + severity + solution + reference + impactLevel + detail { vulnerableLine fileName codeSnippet } + } + ... on WebVulnerability { + severity + solution + reference + impactLevel + detail { url method parameters } + } + ... on ContainerFinding { + severity + solution + reference + impactLevel + detail { package affectedVersion patchedVersion cve } + } + } + metadata { + currentPage + limitValue + totalCount + totalPages + } + } + } + """ + query_with_blob = """ + query Issues($companyId: ID!, $pagination: PaginationInput!, $filters: IssuesFiltersInput) { + issues(companyId: $companyId, pagination: $pagination, filters: $filters) { + collection { + id + title + description + status + type + asset { + name + assetsTagList + company { label } + } + author { name } + assignedUsers { name email } + ... on SastFinding { + severity + solution + reference + impactLevel + detail { vulnerableLine fileName codeSnippetBlob: codeSnippet(blob: true) codeSnippet } + } + ... on ScaFinding { + severity + solution + reference + impactLevel + detail { package affectedVersion patchedVersion cve fileName } } ... on DastFinding { severity @@ -98,12 +227,87 @@ def list_vulnerabilities( solution reference impactLevel + detail { vulnerableLine fileName codeSnippetBlob: codeSnippet(blob: true) codeSnippet } + } + ... on IacFinding { + severity + solution + reference + impactLevel + detail { vulnerableLine fileName codeSnippetBlob: codeSnippet(blob: true) codeSnippet } + } + ... on SecretFinding { + severity + solution + reference + impactLevel + detail { vulnerableLine fileName codeSnippetBlob: codeSnippet(blob: true) codeSnippet } } ... on WebVulnerability { severity solution reference impactLevel + detail { url method parameters } + } + ... on ContainerFinding { + severity + solution + reference + impactLevel + detail { package affectedVersion patchedVersion cve } + } + } + metadata { + currentPage + limitValue + totalCount + totalPages + } + } + } + """ + query_light = """ + query Issues($companyId: ID!, $pagination: PaginationInput!, $filters: IssuesFiltersInput) { + issues(companyId: $companyId, pagination: $pagination, filters: $filters) { + collection { + id + title + status + type + asset { + name + assetsTagList + company { label } + } + author { name } + assignedUsers { name email } + ... on SastFinding { + severity + detail { vulnerableLine fileName codeSnippet } + } + ... on ScaFinding { + severity + detail { package affectedVersion patchedVersion cve fileName } + } + ... on DastFinding { severity } + ... on NetworkVulnerability { severity } + ... on SourceCodeVulnerability { + severity + detail { vulnerableLine fileName codeSnippet } + } + ... on IacFinding { + severity + detail { vulnerableLine fileName codeSnippet } + } + ... on SecretFinding { + severity + detail { vulnerableLine fileName codeSnippet } + } + ... on WebVulnerability { severity } + ... on ContainerFinding { + severity + detail { package affectedVersion patchedVersion cve } } } metadata { @@ -115,6 +319,74 @@ def list_vulnerabilities( } } """ + deep_issue_query = """ + query IssueSearchDetail($id: ID!) { + issue(id: $id) { + id + ... on SastFinding { + detail { fileName vulnerableLine codeSnippetBlob: codeSnippet(blob: true) codeSnippet } + } + ... on SourceCodeVulnerability { + detail { fileName vulnerableLine codeSnippetBlob: codeSnippet(blob: true) codeSnippet } + } + ... on IacFinding { + detail { fileName vulnerableLine codeSnippetBlob: codeSnippet(blob: true) codeSnippet } + } + ... on SecretFinding { + detail { fileName vulnerableLine codeSnippetBlob: codeSnippet(blob: true) codeSnippet } + } + ... on ScaFinding { + detail { package affectedVersion patchedVersion cve fileName } + } + ... on ContainerFinding { + detail { package affectedVersion patchedVersion cve } + } + } + } + """ + deep_web_dast_query = """ + query IssueWebDastDetail($id: ID!) { + issue(id: $id) { + id + ... on WebVulnerability { + detail { url method parameters request response } + } + ... on DastFinding { + detail { url method parameters request response } + } + } + } + """ + deep_web_dast_query_top_level = """ + query IssueWebDastDetailTopLevel($id: ID!) { + issue(id: $id) { + id + ... on WebVulnerability { + method + url + request + response + parameters + } + ... on DastFinding { + method + url + request + response + parameters + } + } + } + """ + deep_web_dast_query_fallback = """ + query IssueWebDastDetailFallback($id: ID!) { + issue(id: $id) { + id + ... on WebVulnerability { detail { url method parameters } } + ... on DastFinding { detail { url method parameters } } + } + } + """ def _split_ids(value: Optional[str]): if not value: @@ -139,27 +411,40 @@ def _split_strs(value: Optional[str]): filters = {} assets_list = _split_ids(asset_ids) projects_list = _split_ids(project_ids) - severities_list = _split_strs(severities) - if severities_list: - new = [] - for s in severities_list: - up = s.upper() - if up not in SEVERITY_ALLOWED: - error(f"Ignoring invalid severity: {s}") - continue - new.append(up) - severities_list = new or None + severities_list = None + if severities: + try: + severities_list = validate_csv_choices(severities, SEVERITY_ALLOWED, "--severities") + except ValueError as exc: + error(str(exc)) + raise typer.Exit(code=1) asset_tags_list = _split_strs(asset_tags) project_types_list = _split_strs(project_types) if project_types_list: project_types_list = [p.upper() for p in project_types_list] cves_list = _split_strs(cves) - types_list = _split_strs(issue_types) - data_class_list = _split_strs(data_classification) - business_impact_list = _split_strs(business_impact) + types_list = None + if issue_types: + try: + types_list = validate_csv_choices(issue_types, FAILURE_TYPES_ALLOWED, "--types") + except ValueError as exc: + error(str(exc)) + raise typer.Exit(code=1) + data_class_list = None + if data_classification: + try: + data_class_list = validate_csv_choices(data_classification, DATA_CLASS_ALLOWED, "--data-classification") + except ValueError as exc: + error(str(exc)) + raise typer.Exit(code=1) + business_impact_list = None + if business_impact: + try: + business_impact_list = validate_csv_choices(business_impact, BUSINESS_IMPACT_ALLOWED, "--business-impact") + except ValueError as exc: + error(str(exc)) + raise typer.Exit(code=1) assignee_list = _split_strs(assignee_emails) - if business_impact_list: - business_impact_list = [b.upper() for b in business_impact_list] if days_back is not None: if days_back < 0: @@ -202,20 +487,227 @@ def _split_strs(value: Optional[str]): if business_impact_list: filters["businessImpact"] = business_impact_list if exploitability: - up = exploitability.upper() - if up not in ATTACK_SURFACE_ALLOWED: - error(f"Ignoring invalid attack surface: {exploitability}") - else: - filters["exploitability"] = up + try: + up = validate_choice(exploitability, ATTACK_SURFACE_ALLOWED, "--attack-surface") + except ValueError as exc: + error(str(exc)) + raise typer.Exit(code=1) + filters["exploitability"] = up if assignee_list: filters["assigneeEmails"] = assignee_list + author_filter = (author or "").strip().lower() or None + grep_filter = (grep or "").strip().lower() or None + + field_aliases = { + "id": "id", + "title": "title", + "type": "type", + "status": "status", + "severity": "severity_raw", + "asset": "asset", + "tags": "tags", + "author": "author", + "assignee": "assignee", + "company": "company", + "description": "description", + "solution": "solution", + "reference": "reference", + "impactlevel": "impactLevel", + "filename": "fileName", + "vulnerableline": "vulnerableLine", + "codesnippet": "codeSnippet", + "snippet": "codeSnippet", + "code": "codeSnippet", + "file": "fileName", + "line": "vulnerableLine", + "package": "package", + "affectedversion": "affectedVersion", + "patchedversion": "patchedVersion", + "cve": "cve", + "url": "url", + "method": "method", + "request": "request", + "response": "response", + "parameters": "parameters", + "params": "parameters", + } + contains_filters: List[Tuple[str, str]] = [] + contains_values_by_field: dict[str, list[str]] = {} + for raw in contains or []: + entry = (raw or "").strip() + if not entry: + continue + if "=" not in entry: + warning(f"Ignoring invalid --contains entry '{entry}'. Use field=value.") + continue + field_raw, value_raw = entry.split("=", 1) + field_key = field_aliases.get(field_raw.strip().lower()) + if not field_key: + warning(f"Ignoring unknown --contains field '{field_raw}'.") + continue + value = value_raw.strip().lower() + if not value: + warning(f"Ignoring empty --contains value for field '{field_raw}'.") + continue + contains_filters.append((field_key, value)) + contains_values_by_field.setdefault(field_key, []).append(value) + + if all_pages and per_page < 200: + per_page = 200 + info("Using larger page size (200) for faster retrieval with --all.") + variables = { "companyId": str(company_id), "pagination": {"page": page, "perPage": per_page}, "filters": filters or None, } - author_filter = (author or "").strip().lower() or None + + include_detail_raw = bool(grep_filter) + search_trace_enabled = bool(grep_filter or contains_filters) + trace_fields = [ + "id", + "title", + "type", + "status", + "severity_raw", + "asset", + "tags", + "author", + "assignee", + "company", + "description", + "solution", + "reference", + "impactLevel", + "fileName", + "vulnerableLine", + "codeSnippet", + "package", + "affectedVersion", + "patchedVersion", + "cve", + "url", + "method", + "request", + "response", + "parameters", + "snippetContent", + ] + + def _matches_local_filters(row: dict) -> bool: + if grep_filter: + haystack = str(row.get("_grep_blob") or "").lower() + if grep_filter not in haystack: + return False + for field_key, value in contains_filters: + current = str(row.get(field_key) or "").lower() + if field_key == "codeSnippet": + # Fast-path: avoid expensive row-wide joins on huge datasets. + fallback = " ".join([current, str(row.get("package") or ""), str(row.get("snippetContent") or "")]).lower() + if value not in current and value not in fallback: + return False + continue + if value not in current: + return False + return True + + def _matched_fields(row: dict) -> list[str]: + matched: list[str] = [] + if grep_filter: + for key in trace_fields: + if grep_filter in str(row.get(key) or "").lower(): + matched.append(key) + for field_key, value in contains_filters: + current = str(row.get(field_key) or "").lower() + if field_key == "codeSnippet": + fallback = " ".join([current, str(row.get("package") or ""), str(row.get("snippetContent") or "")]).lower() + if value in current or value in fallback: + matched.append(field_key) + continue + if value in current: + matched.append(field_key) + # Deduplicate while preserving order + seen = set() + ordered = [] + for item in matched: + if item in seen: + continue + seen.add(item) + ordered.append(item) + return ordered + + auto_deep_fields = {"codeSnippet", "fileName", "vulnerableLine", "request", "response", "url", "method", "parameters"} + auto_deep_requested = sorted(set(contains_values_by_field.keys()) & auto_deep_fields) + effective_deep_search = bool(deep_search or auto_deep_requested) + if auto_deep_requested and not deep_search: + info(f"Auto deep-search enabled for --contains fields: {', '.join(auto_deep_requested)}.") + + code_terms = contains_values_by_field.get("codeSnippet", []) + deep_contains_filters = { + key: values + for key, values in contains_values_by_field.items() + if key in auto_deep_fields + } + requires_deep_for_fields = bool(effective_deep_search and deep_contains_filters) + requires_deep_for_codesnippet = bool(code_terms and effective_deep_search) + use_blob_snippet_query = requires_deep_for_codesnippet + if use_blob_snippet_query: + info("Deep search for codeSnippet enabled: requesting snippet blob inline in paginated query for better performance.") + snippet_content_cache: dict[str, str] = {} + + def _fetch_deep_issue_detail(issue: dict) -> dict: + issue_id = str(issue.get("id") or "") + issue_type = str(issue.get("type") or "").upper() + if not issue_id: + return {} + is_web_dast = issue_type in {"WEB_VULNERABILITY", "DAST_FINDING"} + try: + if is_web_dast: + data = None + for q in (deep_web_dast_query, deep_web_dast_query_top_level, deep_web_dast_query_fallback): + try: + data = graphql_request(q, {"id": issue_id}, log_request=True, verbose_only=True) + break + except Exception: + continue + if not data: + return {} + web_issue = data.get("issue") or {} + detail = web_issue.get("detail") or {} + return { + "url": web_issue.get("url") or detail.get("url"), + "method": web_issue.get("method") or detail.get("method"), + "request": web_issue.get("request") or detail.get("request"), + "response": web_issue.get("response") or detail.get("response"), + "parameters": web_issue.get("parameters") or detail.get("parameters"), + } + + data = graphql_request(deep_issue_query, {"id": issue_id}, log_request=True, verbose_only=True) + issue_data = data.get("issue") or {} + return issue_data.get("detail") or {} + except Exception: + return {} + + def _looks_like_url(value: str) -> bool: + v = (value or "").strip().lower() + return v.startswith("http://") or v.startswith("https://") + + def _fetch_snippet_from_url(url: str) -> str: + if not url: + return "" + cached = snippet_content_cache.get(url) + if cached is not None: + return cached + try: + resp = requests.get(url, timeout=8) + resp.raise_for_status() + text = (resp.text or "")[:300000] + snippet_content_cache[url] = text + return text + except Exception: + snippet_content_cache[url] = "" + return "" try: fetch_all = all_pages # Respect user pagination choices for all formats @@ -223,36 +715,21 @@ def _split_strs(value: Optional[str]): rows = [] total_count = 0 total_pages = None + max_pages = 200 - last_signature = None - while True: - variables["pagination"]["page"] = current_page - # Suppress request spam unless user passed --verbose - data = graphql_request(query, variables, log_request=True, verbose_only=True) + def _fetch_issues_page(page_num: int): + vars_page = { + "companyId": variables["companyId"], + "pagination": {"page": page_num, "perPage": per_page}, + "filters": variables.get("filters"), + } + issues_query = query_with_blob if use_blob_snippet_query else query + data = graphql_request(issues_query, vars_page, log_request=True, verbose_only=True) issues = data["issues"] - collection = issues.get("collection") or [] - metadata = issues.get("metadata") or {} - total_pages = metadata.get("totalPages") - total_count = metadata.get("totalCount", total_count) - # If the API did not return totalPages but returned totalCount, compute it - if not total_pages and total_count and per_page: - total_pages = (total_count + per_page - 1) // per_page - # Fallback safety cap to avoid infinite loops in case pagination metadata is missing - max_pages = 200 - - if not collection: - if current_page == page: - typer.echo("⚠️ No vulnerabilities found.") - raise typer.Exit() - break - - # Detect repeated pages when the API ignores pagination to avoid infinite loops - signature = (collection[0].get("id"), collection[-1].get("id")) - if last_signature == signature: - error("Pagination appears to be repeating the same results; stopping early to avoid a loop.") - break - last_signature = signature + return page_num, (issues.get("collection") or []), (issues.get("metadata") or {}) + def _build_page_rows(collection: list) -> list: + page_rows = [] for vuln in collection: asset = vuln.get("asset") or {} tags = ", ".join(asset.get("assetsTagList") or []) @@ -270,14 +747,14 @@ def _split_strs(value: Optional[str]): } sev_display = severity_value sev_style = sev_color_map.get(severity_value.upper(), None) - if sev_style: + if fmt_lower == "table" and sev_style: sev_display = f"[{sev_style}]{severity_value}[/{sev_style}]" assignee = "" assigned = vuln.get("assignedUsers") or [] if assigned: assignee = assigned[0].get("email") or assigned[0].get("name") or "" - rows.append({ + row = { "id": vuln.get("id"), "title": vuln.get("title"), "type": vuln.get("type"), @@ -296,25 +773,194 @@ def _split_strs(value: Optional[str]): # SAST detail "fileName": (vuln.get("detail") or {}).get("fileName"), "vulnerableLine": (vuln.get("detail") or {}).get("vulnerableLine"), - "codeSnippet": (vuln.get("detail") or {}).get("codeSnippet"), - }) + "codeSnippet": (vuln.get("detail") or {}).get("codeSnippetBlob") or (vuln.get("detail") or {}).get("codeSnippet"), + "package": (vuln.get("detail") or {}).get("package"), + "affectedVersion": (vuln.get("detail") or {}).get("affectedVersion"), + "patchedVersion": (vuln.get("detail") or {}).get("patchedVersion"), + "cve": (vuln.get("detail") or {}).get("cve"), + "url": (vuln.get("detail") or {}).get("url"), + "method": (vuln.get("detail") or {}).get("method"), + "request": (vuln.get("detail") or {}).get("request"), + "response": (vuln.get("detail") or {}).get("response"), + "parameters": (vuln.get("detail") or {}).get("parameters"), + "detailRaw": json.dumps(vuln.get("detail") or {}, ensure_ascii=False) if include_detail_raw else "", + "matchedIn": "", + } + if grep_filter: + row["_grep_blob"] = " ".join( + [ + str(row.get("id") or ""), + str(row.get("title") or ""), + str(row.get("type") or ""), + str(row.get("status") or ""), + str(row.get("severity_raw") or ""), + str(row.get("asset") or ""), + str(row.get("tags") or ""), + str(row.get("author") or ""), + str(row.get("assignee") or ""), + str(row.get("company") or ""), + str(row.get("description") or ""), + str(row.get("solution") or ""), + str(row.get("reference") or ""), + str(row.get("impactLevel") or ""), + str(row.get("fileName") or ""), + str(row.get("vulnerableLine") or ""), + str(row.get("codeSnippet") or ""), + str(row.get("package") or ""), + str(row.get("url") or ""), + str(row.get("method") or ""), + str(row.get("request") or ""), + str(row.get("response") or ""), + str(row.get("parameters") or ""), + str(row.get("affectedVersion") or ""), + str(row.get("patchedVersion") or ""), + str(row.get("cve") or ""), + str(row.get("detailRaw") or ""), + ] + ) + page_rows.append(row) + return page_rows + + def _field_needs_deep_lookup(row: dict, field_key: str, values: list[str]) -> bool: + current = str(row.get(field_key) or "").lower() + if field_key == "codeSnippet": + # URL means we still need detail/blob or fallback download. + if _looks_like_url(current): + return True + fallback = " ".join([current, str(row.get("package") or ""), str(row.get("snippetContent") or "")]).lower() + return any(term not in fallback for term in values) + return any(term not in current for term in values) + + def _enrich_page_rows(page_rows: list): + if requires_deep_for_fields and page_rows: + missing_rows = [] + for row in page_rows: + if any(_field_needs_deep_lookup(row, field_key, values) for field_key, values in deep_contains_filters.items()): + missing_rows.append(row) + if missing_rows: + max_workers = resolve_workers(workers) + detail_list = parallel_map( + _fetch_deep_issue_detail, + missing_rows, + workers=max_workers, + ) + for row, deep_detail in zip(missing_rows, detail_list): + if not deep_detail: + continue + snippet_blob = deep_detail.get("codeSnippetBlob") + snippet_default = deep_detail.get("codeSnippet") + row["fileName"] = deep_detail.get("fileName") or row.get("fileName") + row["vulnerableLine"] = deep_detail.get("vulnerableLine") or row.get("vulnerableLine") + row["codeSnippet"] = snippet_blob or snippet_default or row.get("codeSnippet") + row["package"] = deep_detail.get("package") or row.get("package") + row["affectedVersion"] = deep_detail.get("affectedVersion") or row.get("affectedVersion") + row["patchedVersion"] = deep_detail.get("patchedVersion") or row.get("patchedVersion") + row["cve"] = deep_detail.get("cve") or row.get("cve") + row["url"] = deep_detail.get("url") or row.get("url") + row["method"] = deep_detail.get("method") or row.get("method") + row["request"] = deep_detail.get("request") or row.get("request") + row["response"] = deep_detail.get("response") or row.get("response") + row["parameters"] = deep_detail.get("parameters") or row.get("parameters") + if include_detail_raw: + row["detailRaw"] = json.dumps(deep_detail, ensure_ascii=False) + if grep_filter: + row["_grep_blob"] = f"{row.get('_grep_blob') or ''} {row.get('codeSnippet') or ''} {row.get('fileName') or ''} {row.get('vulnerableLine') or ''} {row.get('package') or ''} {row.get('url') or ''} {row.get('method') or ''} {row.get('request') or ''} {row.get('response') or ''} {row.get('parameters') or ''}" + + if resolve_snippet_urls and missing_rows: + url_rows = [] + for row in missing_rows: + cs = str(row.get("codeSnippet") or "") + if _looks_like_url(cs): + fallback = f"{cs} {row.get('package') or ''}".lower() + if any(term not in fallback for term in code_terms): + url_rows.append(row) + if url_rows: + max_workers = resolve_workers(workers) + contents = parallel_map( + lambda r: _fetch_snippet_from_url(str(r.get("codeSnippet") or "")), + url_rows, + workers=max_workers, + ) + for row, content in zip(url_rows, contents): + if not content: + continue + row["snippetContent"] = content + if include_detail_raw: + row["detailRaw"] = f"{row.get('detailRaw') or ''}\n{content[:120000]}" + if grep_filter: + row["_grep_blob"] = f"{row.get('_grep_blob') or ''} {content[:120000]}" + + def _append_filtered_rows(page_rows: list): + for row in page_rows: + if _matches_local_filters(row): + if search_trace_enabled: + matches = _matched_fields(row) + row["matchedIn"] = ", ".join(matches[:6]) if matches else "unknown" + rows.append(row) + + page_num, collection, metadata = _fetch_issues_page(current_page) + total_pages = metadata.get("totalPages") + total_count = metadata.get("totalCount", total_count) + if not total_pages and total_count and per_page: + total_pages = (total_count + per_page - 1) // per_page + + if not collection: + typer.echo("⚠️ No vulnerabilities found.") + raise typer.Exit() - # Stop when not fetching all, or when we reach the last page, - # or when the API returns fewer items than requested (safety stop). - if not fetch_all: - break - if total_pages is not None and current_page >= total_pages: - break - if len(collection) < per_page: - break - if total_pages is None: - # No pagination metadata; apply a hard cap to prevent infinite loops. - if current_page >= max_pages: - error("Stopping pagination early to avoid infinite loop (missing metadata).") - break - current_page += 1 - if fmt.lower() == "sarif": - sarif = _to_sarif(rows) + page_rows = _build_page_rows(collection) + _enrich_page_rows(page_rows) + _append_filtered_rows(page_rows) + + if fetch_all: + can_parallel_pages = bool(total_pages is not None and current_page < total_pages) + if can_parallel_pages: + page_numbers = list(range(current_page + 1, total_pages + 1)) + max_workers = resolve_workers(workers) + page_results = parallel_map(_fetch_issues_page, page_numbers, workers=max_workers) + for _, next_collection, _ in sorted(page_results, key=lambda x: x[0]): + if not next_collection: + continue + next_rows = _build_page_rows(next_collection) + _enrich_page_rows(next_rows) + _append_filtered_rows(next_rows) + else: + last_signature = (collection[0].get("id"), collection[-1].get("id")) + while True: + current_page += 1 + if total_pages is not None and current_page > total_pages: + break + if total_pages is None and current_page > max_pages: + error("Stopping pagination early to avoid infinite loop (missing metadata).") + break + + _, next_collection, next_metadata = _fetch_issues_page(current_page) + if not next_collection: + break + + signature = (next_collection[0].get("id"), next_collection[-1].get("id")) + if last_signature == signature: + error("Pagination appears to be repeating the same results; stopping early to avoid a loop.") + break + last_signature = signature + + if total_pages is None: + maybe_total_pages = next_metadata.get("totalPages") + maybe_total_count = next_metadata.get("totalCount") + if maybe_total_count: + total_count = maybe_total_count + if maybe_total_pages: + total_pages = maybe_total_pages + + next_rows = _build_page_rows(next_collection) + _enrich_page_rows(next_rows) + _append_filtered_rows(next_rows) + + if total_pages is None and len(next_collection) < per_page: + break + if fmt_lower == "sarif": + export_rows = [{k: v for k, v in r.items() if not str(k).startswith("_")} for r in rows] + sarif = _to_sarif(export_rows) sarif_json = json.dumps(sarif, indent=2) if output: with open(output, "w", encoding="utf-8") as f: @@ -322,11 +968,12 @@ def _split_strs(value: Optional[str]): summary(f"SARIF exported to {output}") else: print(sarif_json) - summary(f"{len(rows)} vulnerability(ies) listed out of {total_count or len(rows)}.") + elapsed = time.perf_counter() - started_at + timed_summary(f"{len(rows)} vulnerability(ies) listed out of {total_count or len(rows)}", elapsed) else: - # Align output fields with the schema to avoid DictWriter errors on extra keys. - output_rows = rows - if schema and hasattr(schema, "display_headers"): + # Keep full payload for JSON; constrain fields for table/csv to schema columns. + output_rows = [{k: v for k, v in r.items() if not str(k).startswith("_")} for r in rows] + if fmt_lower in {"table", "csv"} and schema and hasattr(schema, "display_headers"): display_keys = list(schema.display_headers.keys()) output_rows = [{k: r.get(k, "") for k in display_keys} for r in rows] export_data( @@ -336,7 +983,8 @@ def _split_strs(value: Optional[str]): output=output, title=f"Vulnerabilities (Company {company_id}) - Page {page}/{total_pages or '?'}", ) - summary(f"{len(rows)} vulnerability(ies) listed out of {total_count or len(rows)}.") + elapsed = time.perf_counter() - started_at + timed_summary(f"{len(rows)} vulnerability(ies) listed out of {total_count or len(rows)}", elapsed) except Exception as e: error(f"Error listing vulnerabilities: {e}") raise typer.Exit(code=1) @@ -389,6 +1037,11 @@ def sev_to_level(sev: str): "fileName": r.get("fileName"), "vulnerableLine": r.get("vulnerableLine"), "codeSnippet": r.get("codeSnippet"), + "url": r.get("url"), + "method": r.get("method"), + "request": r.get("request"), + "response": r.get("response"), + "parameters": r.get("parameters"), }, }) @@ -507,6 +1160,7 @@ def vulnerability_timeline( info(f"Listing timeline for vulnerability {issue_id}...") else: info(f"Listing timeline for vulnerabilities in project {project_id} (company {company_id})...") + started_at = time.perf_counter() status_filter = status.strip().upper() if status else None email_filter = (user_email or "").strip().lower() or None @@ -580,6 +1234,24 @@ def _fetch_project_issues(cid: int, pid: int) -> list[dict]: current_page += 1 return out + def _fetch_issue_timeline(issue_stub: dict) -> tuple[dict, list[dict]]: + current_issue_id = issue_stub.get("id") + if not current_issue_id: + return {}, [] + try: + data = graphql_request( + issue_timeline_query, + {"id": str(current_issue_id)}, + log_request=True, + verbose_only=True, + ) + except Exception: + return issue_stub, [] + issue = data.get("issue") or {} + if not issue: + return issue_stub, [] + return issue, issue.get("history") or [] + try: rows = [] target_issues = [] @@ -591,16 +1263,11 @@ def _fetch_project_issues(cid: int, pid: int) -> list[dict]: warning("No vulnerabilities found for the given project.") raise typer.Exit() - for target in target_issues: - current_issue_id = target.get("id") - if not current_issue_id: - continue - - data = graphql_request(issue_timeline_query, {"id": str(current_issue_id)}, log_request=True, verbose_only=True) - issue = data.get("issue") + issue_histories = parallel_map(_fetch_issue_timeline, target_issues) + for issue, history_rows in issue_histories: if not issue: continue - history_rows = issue.get("history") or [] + current_issue_id = issue.get("id") for h in history_rows: action_type = (h.get("action") or "").upper() @@ -631,8 +1298,8 @@ def _fetch_project_issues(cid: int, pid: int) -> list[dict]: rows.append({ "projectId": str(project_id) if project_id is not None else "", "issueId": issue.get("id") or current_issue_id, - "issueTitle": issue.get("title") or target.get("title") or "", - "currentIssueStatus": issue.get("status") or target.get("status") or "", + "issueTitle": issue.get("title") or "", + "currentIssueStatus": issue.get("status") or "", "eventId": h.get("eventId") or "", "createdAt": created_at, "actorName": actor_name, @@ -710,7 +1377,8 @@ def _fetch_project_issues(cid: int, pid: int) -> list[dict]: output=output, title=f"Project {project_id} - Last Status Change Per Vulnerability", ) - summary(f"{len(latest_rows)} vulnerability(ies) with last status-change listed.") + elapsed = time.perf_counter() - started_at + timed_summary(f"{len(latest_rows)} vulnerability(ies) with last status-change listed", elapsed) return export_data( @@ -720,7 +1388,8 @@ def _fetch_project_issues(cid: int, pid: int) -> list[dict]: output=output, title=f"Vulnerability {issue_id} - Last Status Change", ) - summary("1 last status-change event listed.") + elapsed = time.perf_counter() - started_at + timed_summary("1 last status-change event listed", elapsed) return if project_id is not None: @@ -739,7 +1408,8 @@ def _fetch_project_issues(cid: int, pid: int) -> list[dict]: output=output, title=f"Vulnerability {issue_id} - Timeline", ) - summary(f"{len(rows)} timeline event(s) listed.") + elapsed = time.perf_counter() - started_at + timed_summary(f"{len(rows)} timeline event(s) listed", elapsed) except typer.Exit: raise diff --git a/src/conviso/core/concurrency.py b/src/conviso/core/concurrency.py new file mode 100644 index 0000000..092cfb4 --- /dev/null +++ b/src/conviso/core/concurrency.py @@ -0,0 +1,42 @@ +""" +Concurrency helpers +------------------- +Shared primitives for parallel execution across commands. +""" + +from concurrent.futures import ThreadPoolExecutor +from typing import Callable, Iterable, TypeVar, List, Optional + +T = TypeVar("T") +R = TypeVar("R") +DEFAULT_WORKERS = 8 + + +def set_default_workers(workers: int): + global DEFAULT_WORKERS + DEFAULT_WORKERS = workers if workers and workers > 0 else 1 + + +def get_default_workers() -> int: + return DEFAULT_WORKERS + + +def resolve_workers(workers: Optional[int]) -> int: + if workers is None: + return get_default_workers() + return workers if workers > 0 else 1 + + +def parallel_map(func: Callable[[T], R], items: Iterable[T], workers: Optional[int] = None) -> List[R]: + """ + Apply func to items in parallel (I/O bound), preserving order. + Falls back to sequential execution when workers <= 1. + """ + data = list(items) + if not data: + return [] + max_workers = resolve_workers(workers) + if max_workers <= 1: + return [func(item) for item in data] + with ThreadPoolExecutor(max_workers=max_workers) as pool: + return list(pool.map(func, data)) diff --git a/src/conviso/core/notifier.py b/src/conviso/core/notifier.py index 440dd45..ddce42c 100644 --- a/src/conviso/core/notifier.py +++ b/src/conviso/core/notifier.py @@ -25,3 +25,29 @@ def summary(message, error_count=0): typer.echo(f"🧾 {str(message)}") if error_count: typer.echo(f"⚠️ {error_count} error(s) occurred.") + + +def format_duration(seconds: float) -> str: + """ + Human-friendly duration formatting. + Examples: 0.42s -> 420ms, 12.3s -> 12.30s, 1075.39s -> 17m 55.39s + """ + try: + total = float(seconds) + except Exception: + return f"{seconds}s" + if total < 1: + return f"{int(total * 1000)}ms" + hours = int(total // 3600) + total -= hours * 3600 + minutes = int(total // 60) + total -= minutes * 60 + if hours > 0: + return f"{hours}h {minutes}m {total:.2f}s" + if minutes > 0: + return f"{minutes}m {total:.2f}s" + return f"{total:.2f}s" + + +def timed_summary(message_without_time: str, elapsed_seconds: float, error_count: int = 0): + summary(f"{message_without_time} in {format_duration(elapsed_seconds)}.", error_count=error_count) diff --git a/src/conviso/core/output_manager.py b/src/conviso/core/output_manager.py index 844c1b3..05c99b8 100644 --- a/src/conviso/core/output_manager.py +++ b/src/conviso/core/output_manager.py @@ -15,7 +15,8 @@ from typing import List, Dict, Any, Optional from rich.console import Console from rich.table import Table -from conviso.core.notifier import info, success, error +from conviso.core.notifier import info, success, error, warning +from conviso.core.output_prefs import get_repeat_header_every, get_selected_columns console = Console() @@ -34,9 +35,32 @@ def export_data(data: List[Dict], schema=None, fmt: str = "table", output: str = console.print("[yellow]⚠️ No data to export.[/yellow]") return + selected = get_selected_columns() or [] + if selected: + key_by_lower = {k.lower(): k for k in field_keys} + label_by_lower = {str(label).lower(): key for key, label in zip(field_keys, columns)} + chosen_keys = [] + unknown = [] + for raw in selected: + token = raw.strip().lower() + if not token: + continue + key = key_by_lower.get(token) or label_by_lower.get(token) + if key and key not in chosen_keys: + chosen_keys.append(key) + elif not key: + unknown.append(raw) + if chosen_keys: + field_keys = chosen_keys + columns = [schema.display_headers.get(k, k) if schema and hasattr(schema, "display_headers") else k for k in field_keys] + if unknown: + warning(f"Ignoring unknown column(s): {', '.join(unknown)}") + + filtered_data = [{k: row.get(k, "") for k in field_keys} for row in data] + # --- JSON output --- if fmt == "json": - result = json.dumps(data, indent=2, ensure_ascii=False) + result = json.dumps(filtered_data, indent=2, ensure_ascii=False) # If output file is specified, save to disk if output: @@ -54,19 +78,19 @@ def export_data(data: List[Dict], schema=None, fmt: str = "table", output: str = with open(output, "w", newline="", encoding="utf-8") as csvfile: writer = csv.DictWriter(csvfile, fieldnames=field_keys) writer.writeheader() - writer.writerows(data) + writer.writerows(filtered_data) console.print(f"[green]File saved to {output}[/green]") else: writer = csv.DictWriter(sys.stdout, fieldnames=field_keys) writer.writeheader() - writer.writerows(data) + writer.writerows(filtered_data) return # --- TABLE output --- else: # Detect numeric columns to right-align def _is_numeric_column(key: str) -> bool: - for row in data: + for row in filtered_data: val = row.get(key) if val is None or val == "": continue @@ -77,26 +101,34 @@ def _is_numeric_column(key: str) -> bool: return True numeric_cols = {k: _is_numeric_column(k) for k in field_keys} - - table = Table( - title=title or "Results", - show_header=True, - header_style="bold cyan", - row_styles=["none", "dim"], # zebra striping for readability - ) - for col_key, col_name in zip(field_keys, columns): - table.add_column( - col_name, - overflow="ellipsis", - max_width=25, - justify="right" if numeric_cols.get(col_key) else "left", - no_wrap=False, + repeat_every = get_repeat_header_every() + + def _build_table(chunk: List[Dict], table_title: Optional[str] = None): + table = Table( + title=table_title or "Results", + show_header=True, + header_style="bold cyan", + row_styles=["none", "dim"], ) - - for row in data: - table.add_row(*(str(row.get(k, "")) for k in field_keys)) - - console.print(table) + for col_key, col_name in zip(field_keys, columns): + table.add_column( + col_name, + overflow="ellipsis", + max_width=25, + justify="right" if numeric_cols.get(col_key) else "left", + no_wrap=False, + ) + for row in chunk: + table.add_row(*(str(row.get(k, "")) for k in field_keys)) + return table + + if repeat_every and repeat_every > 0 and len(filtered_data) > repeat_every: + for i in range(0, len(filtered_data), repeat_every): + chunk = filtered_data[i:i + repeat_every] + chunk_title = title if i == 0 else "" + console.print(_build_table(chunk, chunk_title)) + else: + console.print(_build_table(filtered_data, title or "Results")) diff --git a/src/conviso/core/output_prefs.py b/src/conviso/core/output_prefs.py new file mode 100644 index 0000000..1e98922 --- /dev/null +++ b/src/conviso/core/output_prefs.py @@ -0,0 +1,29 @@ +""" +Global output preferences +------------------------- +Stores CLI-wide output options configured in app callback. +""" + +from typing import Optional, List + +REPEAT_HEADER_EVERY = 0 +SELECTED_COLUMNS: Optional[List[str]] = None + + +def set_output_preferences(repeat_header_every: int = 0, columns: Optional[str] = None): + global REPEAT_HEADER_EVERY, SELECTED_COLUMNS + REPEAT_HEADER_EVERY = repeat_header_every if repeat_header_every and repeat_header_every > 0 else 0 + if columns: + parsed = [c.strip() for c in columns.split(",") if c.strip()] + SELECTED_COLUMNS = parsed or None + else: + SELECTED_COLUMNS = None + + +def get_repeat_header_every() -> int: + return REPEAT_HEADER_EVERY + + +def get_selected_columns() -> Optional[List[str]]: + return SELECTED_COLUMNS + diff --git a/src/conviso/core/validators.py b/src/conviso/core/validators.py new file mode 100644 index 0000000..9dac1cb --- /dev/null +++ b/src/conviso/core/validators.py @@ -0,0 +1,69 @@ +""" +Input validators for fixed CLI choices. +""" + +from __future__ import annotations + +import difflib +from typing import Iterable, Optional, List + + +def _suggest(value: str, allowed: Iterable[str]) -> Optional[str]: + options = sorted({str(a).upper() for a in allowed}) + matches = difflib.get_close_matches(str(value).upper(), options, n=1, cutoff=0.6) + return matches[0] if matches else None + + +def validate_choice(value: Optional[str], allowed: Iterable[str], param_name: str) -> Optional[str]: + """ + Validate a single fixed choice (case-insensitive) and return normalized upper value. + Raises ValueError with user-friendly guidance. + """ + if value is None: + return None + up = str(value).strip().upper() + allowed_set = {str(a).upper() for a in allowed} + if up in allowed_set: + return up + hint = _suggest(up, allowed_set) + if hint: + raise ValueError( + f"Invalid value for {param_name}: '{value}' (did you mean '{hint}'?). " + f"Allowed values: {', '.join(sorted(allowed_set))}" + ) + raise ValueError( + f"Invalid value for {param_name}: '{value}'. " + f"Allowed values: {', '.join(sorted(allowed_set))}" + ) + + +def validate_csv_choices(value: Optional[str], allowed: Iterable[str], param_name: str) -> Optional[List[str]]: + """ + Validate comma-separated fixed choices (case-insensitive) and return normalized upper list. + Raises ValueError with user-friendly guidance. + """ + if not value: + return None + allowed_set = {str(a).upper() for a in allowed} + parts = [p.strip() for p in str(value).split(",") if p.strip()] + normalized: List[str] = [] + invalid: List[str] = [] + hints: List[str] = [] + for part in parts: + up = part.upper() + if up in allowed_set: + normalized.append(up) + continue + invalid.append(part) + maybe = _suggest(up, allowed_set) + if maybe: + hints.append(f"'{part}' (did you mean '{maybe}'?)") + else: + hints.append(f"'{part}'") + if invalid: + raise ValueError( + f"Invalid value(s) for {param_name}: {', '.join(hints)}. " + f"Allowed values: {', '.join(sorted(allowed_set))}" + ) + return normalized or None + diff --git a/src/conviso/schemas/vulnerabilities_schema.py b/src/conviso/schemas/vulnerabilities_schema.py index 780ca62..15933be 100644 --- a/src/conviso/schemas/vulnerabilities_schema.py +++ b/src/conviso/schemas/vulnerabilities_schema.py @@ -21,6 +21,7 @@ def __init__(self): "author", "assignee", "company", + "matchedIn", ] self.display_headers: Dict[str, str] = { @@ -34,6 +35,7 @@ def __init__(self): "author": "Author", "assignee": "Assignee", "company": "Company", + "matchedIn": "Matched In", } def display_name(self, field: str) -> str: