Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/content/supported_tools/parsers/file/govulncheck.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,29 @@ toc_hide: true
---
JSON vulnerability report generated by govulncheck tool, using a command like `govulncheck -json . >> report.json`

### Govulncheck Scanner V2

A second scan type, **Govulncheck Scanner V2**, is available for the streaming JSON
format (`govulncheck -format json ./...`). It addresses several limitations of the
original parser:

- It iterates the `finding` records instead of the `osv` advisory definitions, so
advisories that are present in the vulnerability database stream but do not apply
to the scanned code are no longer imported (this previously inflated the finding
count).
- The Go vulnerability database does not publish CVSS scores, so severity is derived
from govulncheck's reachability level, kept separate per tier:
- `symbol` (the vulnerable symbol is called) → **High**
- `package` (the vulnerable package is imported) → **Low**
- `module` (the vulnerable module is required) → **Info**

Use the *Minimum Severity* import option (e.g. `High`) to keep only the reachable
findings, matching the default `govulncheck ./...` output.
- One finding is produced per (advisory, module) pair so multi-module advisories map
to the correct vulnerable components.

The original **Govulncheck Scanner** parser is unchanged and remains available.

### Sample Scan Data
Sample Govulncheck scans can be found [here](https://github.com/DefectDojo/django-DefectDojo/tree/master/unittests/scans/govulncheck).

Expand Down
1 change: 1 addition & 0 deletions dojo/settings/settings.dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -1292,6 +1292,7 @@ def generate_url(scheme, double_slashes, user, password, host, port, path, param
"GitLab Dependency Scanning Report": DEDUPE_ALGO_HASH_CODE,
"GitLab SAST Report": DEDUPE_ALGO_HASH_CODE,
"Govulncheck Scanner": DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL,
"Govulncheck Scanner V2": DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL,
"GitLab Container Scan": DEDUPE_ALGO_HASH_CODE,
"GitLab Secret Detection Report": DEDUPE_ALGO_HASH_CODE,
"Checkov Scan": DEDUPE_ALGO_HASH_CODE,
Expand Down
200 changes: 200 additions & 0 deletions dojo/tools/govulncheck/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,34 @@
SEVERITY = "Info"


def load_govulncheck_stream(scan_file):
"""
Load govulncheck output that may be a single JSON document or a stream of concatenated JSON objects.

Returns the parsed object (dict for the old format, list for the new streaming format).
Raises ValueError on unparseable input.
"""
try:
return json.load(scan_file)
except json.JSONDecodeError:
scan_file.seek(0)
data = []
buf = ""
for line in scan_file:
if not line.strip():
continue
buf += line.decode("utf-8") if isinstance(line, bytes) else line
try:
data.append(json.loads(buf))
buf = ""
except json.JSONDecodeError:
continue
if not data:
msg = "Invalid JSON format"
raise ValueError(msg)
return data


class GovulncheckParser:
def get_scan_types(self):
return ["Govulncheck Scanner"]
Expand Down Expand Up @@ -249,3 +277,175 @@ def get_findings(self, scan_file, test):
)
findings.append(finding)
return findings


class GovulncheckParserV2:

"""
Govulncheck parser v2.

Iterates the ``finding`` records of the streaming JSON output instead of the
``osv`` advisory definitions. This:

* Drops advisories that are present in the vulnerability database stream but
do not actually apply to the scanned code (the old parser imported these,
inflating the finding count).
* Derives a severity from govulncheck's reachability level, since the Go
vulnerability database does not provide CVSS scores:
- ``symbol`` (vulnerable symbol is called) -> High
- ``package`` (vulnerable package imported) -> Low
- ``module`` (vulnerable module required) -> Info
* Emits one finding per (osv, module) pair so multi-module advisories map to
the correct vulnerable components.
"""

# govulncheck reachability level -> DefectDojo severity.
# Kept separate per level so each reachability tier is distinguishable.
LEVEL_SEVERITY = {
"symbol": "High",
"package": "Low",
"module": "Info",
}
LEVEL_RANK = {"module": 1, "package": 2, "symbol": 3}

def get_scan_types(self):
return ["Govulncheck Scanner V2"]

def get_label_for_scan_types(self, scan_type):
return scan_type

def get_description_for_scan_types(self, scan_type):
return (
"Import Govulncheck Scanner findings in JSON format. V2 derives "
"severity from reachability and maps findings to components."
)

@staticmethod
def get_level(trace_entry):
if "function" in trace_entry:
return "symbol"
if "package" in trace_entry:
return "package"
return "module"

@staticmethod
def format_trace(trace):
# trace[0] is the vulnerable sink, the last entry is the entry point in user code.
lines = []
for entry in trace:
module = entry.get("module", "Unknown module")
version = entry.get("version", "")
package = entry.get("package", "")
function = entry.get("function", "")
pos = entry.get("position", {})
location = ""
if pos:
location = f"{pos.get('filename', '')}:{pos.get('line', '')}:{pos.get('column', '')}"
symbol = ".".join(p for p in (package, function) if p)
parts = [f"Module: {module}@{version}" if version else f"Module: {module}"]
if symbol:
parts.append(f"Symbol: {symbol}")
if location:
parts.append(f"Location: {location}")
lines.append("\t" + ", ".join(parts))
return "\n".join(lines)

def get_findings(self, scan_file, test):
data = load_govulncheck_stream(scan_file)
# The v2 parser only targets the new streaming format (a list of objects).
if not isinstance(data, list):
return []

osv_defs = {
elem["osv"]["id"]: elem["osv"]
for elem in data
if isinstance(elem, dict) and "osv" in elem
}

# Group findings by (osv_id, module).
groups = {}
for elem in data:
if not (isinstance(elem, dict) and "finding" in elem):
continue
finding = elem["finding"]
osv_id = finding.get("osv")
trace = finding.get("trace") or [{}]
sink = trace[0]
module = sink.get("module", "Unknown")
key = (osv_id, module)
level = self.get_level(sink)
group = groups.setdefault(key, {
"osv_id": osv_id,
"module": module,
"version": sink.get("version", ""),
"package": sink.get("package", ""),
"level": level,
"fixed_version": finding.get("fixed_version", ""),
"traces": [],
})
# Keep the highest reachability level seen for this component.
if self.LEVEL_RANK[level] > self.LEVEL_RANK[group["level"]]:
group["level"] = level
if not group["fixed_version"]:
group["fixed_version"] = finding.get("fixed_version", "")
if len(trace) > 1 or "function" in sink:
group["traces"].append(trace)

findings = []
for group in groups.values():
osv = osv_defs.get(group["osv_id"], {})
osv_id = group["osv_id"]
module = group["module"]
version = group["version"]
level = group["level"]
fixed_version = group["fixed_version"]

aliases = osv.get("aliases") or []
cve = aliases[0] if aliases else None
summary = osv.get("summary", "")
details = osv.get("details", "")
db_specific = osv.get("database_specific", {}) or {}
url = db_specific.get("url", "")

title = f"{osv_id} - {module}"

description_parts = [
f"**Summary:** {summary or 'Unknown'}",
f"**Vulnerable module:** {module}@{version}" if version else f"**Vulnerable module:** {module}",
f"**Reachability:** {level} ({self.LEVEL_SEVERITY[level]} severity)",
]
if details:
description_parts.append(f"**Details:** {details}")
if group["traces"]:
trace_blocks = "\n".join(self.format_trace(t) for t in group["traces"])
description_parts.append(f"**Traces:**\n{trace_blocks}")
description = "\n".join(description_parts)

references = [
f"{ref.get('type', 'WEB')}: {ref['url']}"
for ref in osv.get("references", [])
if ref.get("url")
]
if url:
references.append(f"Database: {url}")

d = {
"title": title,
"severity": self.LEVEL_SEVERITY[level],
"cve": cve,
"component_name": module,
"component_version": version,
"description": description,
"references": "\n".join(references),
"fix_available": bool(fixed_version),
"fix_version": fixed_version,
"url": url,
"unique_id_from_tool": f"{osv_id}:{module}",
}
finding = Finding(**d)
if settings.V3_FEATURE_LOCATIONS and module:
finding.unsaved_locations.append(
LocationData.dependency(purl_type="golang", name=module, version=version),
)
findings.append(finding)
return findings
Loading
Loading