Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ conviso --help
- Vulnerabilities (last 7 days): `python -m conviso.app vulns list --company-id 443 --days-back 7 --severities HIGH,CRITICAL --all`
- Vulnerabilities by author: `python -m conviso.app vulns list --company-id 443 --author "Fernando" --all`
- Vulnerability timeline (by vulnerability ID): `python -m conviso.app vulns timeline --id 12345`
- Vulnerabilities timeline by project: `python -m conviso.app vulns timeline --company-id 443 --project-id 26102`
- Last user who changed vuln status: `python -m conviso.app vulns timeline --id 12345 --last-status-change-only`
- Last user who changed status per vuln in a project: `python -m conviso.app vulns timeline --company-id 443 --project-id 26102 --last-status-change-only`
- Last user who changed vuln status to ANALYSIS: `python -m conviso.app vulns timeline --id 12345 --status ANALYSIS --last-status-change-only`

Output options: `--format table|json|csv`, `--output path` to save JSON/CSV.
Expand Down
2 changes: 1 addition & 1 deletion src/conviso/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.3.2
0.3.3
237 changes: 190 additions & 47 deletions src/conviso/commands/vulnerabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from conviso.clients.client_graphql import graphql_request
from conviso.core.output_manager import export_data
from conviso.schemas.vulnerabilities_schema import schema
from conviso.schemas.vulnerabilities_timeline_schema import timeline_schema, timeline_last_schema

app = typer.Typer(help="List and manage vulnerabilities (WEB, NETWORK, SOURCE).")

Expand Down Expand Up @@ -478,7 +479,9 @@ def _extract_status_change_fields(history_item: dict) -> tuple[str, str, str]:

@app.command("timeline", help="Show vulnerability timeline/history and filter by actor/status.")
def vulnerability_timeline(
issue_id: int = typer.Option(..., "--id", "-i", help="Vulnerability/issue ID."),
issue_id: Optional[int] = typer.Option(None, "--id", "-i", help="Vulnerability/issue ID."),
company_id: Optional[int] = typer.Option(None, "--company-id", "-c", help="Company ID (required with --project-id)."),
project_id: Optional[int] = typer.Option(None, "--project-id", "-P", help="Project ID to aggregate timelines from related vulnerabilities."),
user_email: Optional[str] = typer.Option(None, "--user-email", help="Filter by actor email or name (contains, case-insensitive)."),
status: Optional[str] = typer.Option(None, "--status", help="Filter status-change events by target status (IssueStatusLabel)."),
history_start: Optional[str] = typer.Option(None, "--history-start", help="History created_at >= this value (YYYY-MM-DD or ISO-8601)."),
Expand All @@ -487,14 +490,30 @@ def vulnerability_timeline(
fmt: str = typer.Option("table", "--format", "-f", help="Output format: table, json, csv."),
output: Optional[str] = typer.Option(None, "--output", "-o", help="Output file for json/csv."),
):
info(f"Listing timeline for vulnerability {issue_id}...")
if issue_id is not None and project_id is not None:
error("Use either --id or --project-id, not both.")
raise typer.Exit(code=1)
if issue_id is None and project_id is None:
error("Provide --id or --project-id (with --company-id).")
raise typer.Exit(code=1)
if project_id is not None and company_id is None:
error("--company-id is required when using --project-id.")
raise typer.Exit(code=1)
if company_id is not None and project_id is None and issue_id is None:
error("--company-id alone is not enough; use with --project-id or provide --id.")
raise typer.Exit(code=1)

if issue_id is not None:
info(f"Listing timeline for vulnerability {issue_id}...")
else:
info(f"Listing timeline for vulnerabilities in project {project_id} (company {company_id})...")

status_filter = status.strip().upper() if status else None
email_filter = (user_email or "").strip().lower() or None
history_start_dt = _parse_dt_filter(history_start, end_of_day=False)
history_end_dt = _parse_dt_filter(history_end, end_of_day=True)

query = """
issue_timeline_query = """
query IssueTimeline($id: ID!) {
issue(id: $id) {
id
Expand All @@ -515,54 +534,114 @@ def vulnerability_timeline(
}
"""

try:
data = graphql_request(query, {"id": str(issue_id)})
issue = data.get("issue")
if not issue:
warning(f"Issue {issue_id} not found.")
raise typer.Exit(code=1)
history_rows = issue.get("history") or []
project_issues_query = """
query IssuesByProject($companyId: ID!, $pagination: PaginationInput!, $filters: IssuesFiltersInput) {
issues(companyId: $companyId, pagination: $pagination, filters: $filters) {
collection {
id
title
status
}
metadata {
currentPage
totalPages
totalCount
}
}
}
"""

def _fetch_project_issues(cid: int, pid: int) -> list[dict]:
current_page = 1
per_page = 100
out = []
while True:
data = graphql_request(
project_issues_query,
{
"companyId": str(cid),
"pagination": {"page": current_page, "perPage": per_page},
"filters": {"projectIds": [pid]},
},
log_request=True,
verbose_only=True,
)
issues = (data.get("issues") or {})
collection = issues.get("collection") or []
metadata = issues.get("metadata") or {}
total_pages = metadata.get("totalPages")
out.extend(collection)
if not collection:
break
if total_pages is not None and current_page >= total_pages:
break
if len(collection) < per_page:
break
current_page += 1
return out

try:
rows = []
for h in history_rows:
action_type = (h.get("action") or "").upper()
actor_email = (h.get("authorEmail") or "").strip()
actor_name = actor_email.split("@", 1)[0] if actor_email else ""
created_at = h.get("at") or ""
created_at_dt = _safe_parse_iso(created_at)
from_status = (h.get("previousStatus") or "").upper()
to_status = (h.get("status") or "").upper()
event_status = to_status
kind = (h.get("kind") or "").lower()
is_status_change = bool(kind == "status" or from_status or to_status)

if email_filter:
haystack = f"{actor_email.lower()} {actor_name.lower()}".strip()
if email_filter not in haystack:
continue
if history_start_dt and (created_at_dt is None or created_at_dt < history_start_dt):
target_issues = []
if issue_id is not None:
target_issues = [{"id": str(issue_id)}]
else:
target_issues = _fetch_project_issues(company_id, project_id)
if not target_issues:
warning("No vulnerabilities found for the given project.")
raise typer.Exit()

for target in target_issues:
current_issue_id = target.get("id")
if not current_issue_id:
continue
if history_end_dt and (created_at_dt is None or created_at_dt > history_end_dt):

data = graphql_request(issue_timeline_query, {"id": str(current_issue_id)}, log_request=True, verbose_only=True)
issue = data.get("issue")
if not issue:
continue
if status_filter:
if not is_status_change:
history_rows = issue.get("history") or []

for h in history_rows:
action_type = (h.get("action") or "").upper()
actor_email = (h.get("authorEmail") or "").strip()
actor_name = actor_email.split("@", 1)[0] if actor_email else ""
created_at = h.get("at") or ""
created_at_dt = _safe_parse_iso(created_at)
from_status = (h.get("previousStatus") or "").upper()
to_status = (h.get("status") or "").upper()
event_status = to_status
kind = (h.get("kind") or "").lower()
is_status_change = bool(kind == "status" or from_status or to_status)

if email_filter:
haystack = f"{actor_email.lower()} {actor_name.lower()}".strip()
if email_filter not in haystack:
continue
if history_start_dt and (created_at_dt is None or created_at_dt < history_start_dt):
continue
if (to_status or event_status) != status_filter:
if history_end_dt and (created_at_dt is None or created_at_dt > history_end_dt):
continue
if status_filter:
if not is_status_change:
continue
if (to_status or event_status) != status_filter:
continue

rows.append({
"issueId": issue.get("id") or issue_id,
"issueTitle": issue.get("title") or "",
"currentIssueStatus": issue.get("status") or "",
"eventId": h.get("eventId") or "",
"createdAt": created_at,
"actorName": actor_name,
"actorEmail": actor_email,
"actionType": action_type,
"fromStatus": from_status,
"toStatus": to_status or event_status,
"statusChange": "true" if is_status_change else "false",
})
rows.append({
"projectId": str(project_id) if project_id is not None else "",
"issueId": issue.get("id") or current_issue_id,
"issueTitle": issue.get("title") or target.get("title") or "",
"currentIssueStatus": issue.get("status") or target.get("status") or "",
"eventId": h.get("eventId") or "",
"createdAt": created_at,
"actorName": actor_name,
"actorEmail": actor_email,
"actionType": action_type,
"fromStatus": from_status,
"toStatus": to_status or event_status,
"statusChange": "true" if is_status_change else "false",
})

if not rows:
warning("No timeline events found for the given filters.")
Expand All @@ -581,6 +660,7 @@ def vulnerability_timeline(
)
latest = status_rows[-1]
latest = {
"projectId": latest.get("projectId"),
"issueId": latest.get("issueId"),
"issueTitle": latest.get("issueTitle"),
"currentIssueStatus": latest.get("currentIssueStatus"),
Expand All @@ -591,17 +671,80 @@ def vulnerability_timeline(
"toStatus": latest.get("toStatus"),
"actionType": latest.get("actionType"),
}
export_data([latest], fmt=fmt, output=output, title=f"Vulnerability {issue_id} - Last Status Change")
if project_id is not None:
grouped_latest = {}
for r in status_rows:
key = str(r.get("issueId") or "")
curr = grouped_latest.get(key)
if curr is None:
grouped_latest[key] = r
continue
curr_dt = _safe_parse_iso(curr.get("createdAt") or "") or datetime.min.replace(tzinfo=timezone.utc)
r_dt = _safe_parse_iso(r.get("createdAt") or "") or datetime.min.replace(tzinfo=timezone.utc)
if r_dt > curr_dt or (r_dt == curr_dt and str(r.get("eventId") or "") > str(curr.get("eventId") or "")):
grouped_latest[key] = r
latest_rows = []
for r in grouped_latest.values():
latest_rows.append({
"projectId": r.get("projectId"),
"issueId": r.get("issueId"),
"issueTitle": r.get("issueTitle"),
"currentIssueStatus": r.get("currentIssueStatus"),
"lastChangedAt": r.get("createdAt"),
"lastChangedBy": r.get("actorName"),
"lastChangedByEmail": r.get("actorEmail"),
"fromStatus": r.get("fromStatus"),
"toStatus": r.get("toStatus"),
"actionType": r.get("actionType"),
})
latest_rows.sort(
key=lambda r: (
_safe_parse_iso(r.get("lastChangedAt") or "") or datetime.min.replace(tzinfo=timezone.utc),
str(r.get("issueId") or ""),
)
)
export_data(
latest_rows,
schema=timeline_last_schema,
fmt=fmt,
output=output,
title=f"Project {project_id} - Last Status Change Per Vulnerability",
)
summary(f"{len(latest_rows)} vulnerability(ies) with last status-change listed.")
return

export_data(
[latest],
schema=timeline_last_schema,
fmt=fmt,
output=output,
title=f"Vulnerability {issue_id} - Last Status Change",
)
summary("1 last status-change event listed.")
return

export_data(rows, fmt=fmt, output=output, title=f"Vulnerability {issue_id} - Timeline")
if project_id is not None:
export_data(
rows,
schema=timeline_schema,
fmt=fmt,
output=output,
title=f"Project {project_id} - Vulnerabilities Timeline",
)
else:
export_data(
rows,
schema=timeline_schema,
fmt=fmt,
output=output,
title=f"Vulnerability {issue_id} - Timeline",
)
summary(f"{len(rows)} timeline event(s) listed.")

except typer.Exit:
raise
except Exception as exc:
if "RECORD_NOT_FOUND" in str(exc):
if "RECORD_NOT_FOUND" in str(exc) and issue_id is not None:
error(f"Issue {issue_id} not found. Use the vulnerability ID (not project ID).")
raise typer.Exit(code=1)
error(f"Error listing vulnerability timeline: {exc}")
Expand Down
43 changes: 43 additions & 0 deletions src/conviso/schemas/vulnerabilities_timeline_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""
Vulnerabilities Timeline Schemas
--------------------------------
Friendly table/csv headers for vulnerability timeline outputs.
"""


class VulnerabilitiesTimelineSchema:
def __init__(self):
self.display_headers = {
"projectId": "Project ID",
"issueId": "Vulnerability ID",
"issueTitle": "Vulnerability",
"currentIssueStatus": "Current Status",
"eventId": "Event ID",
"createdAt": "Event At",
"actorName": "Changed By",
"actorEmail": "Changed By Email",
"actionType": "Action",
"fromStatus": "From Status",
"toStatus": "To Status",
"statusChange": "Status Change",
}


class VulnerabilitiesTimelineLastSchema:
def __init__(self):
self.display_headers = {
"projectId": "Project ID",
"issueId": "Vulnerability ID",
"issueTitle": "Vulnerability",
"currentIssueStatus": "Current Status",
"lastChangedAt": "Last Changed At",
"lastChangedBy": "Last Changed By",
"lastChangedByEmail": "Last Changed By Email",
"fromStatus": "From Status",
"toStatus": "To Status",
"actionType": "Action",
}


timeline_schema = VulnerabilitiesTimelineSchema()
timeline_last_schema = VulnerabilitiesTimelineLastSchema()