Skip to content

Commit b7e51a9

Browse files
committed
Add new importer
1 parent 9911790 commit b7e51a9

9 files changed

Lines changed: 659 additions & 13 deletions

File tree

pyproject.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ python_files = "*.py"
4040
python_classes = "Test"
4141
python_functions = "test"
4242

43+
filterwarnings = [
44+
"ignore:Unknown config option.*:pytest.PytestConfigWarning",
45+
'ignore:datetime.datetime.utcfromtimestamp\(\) is deprecated and scheduled for removal.*:DeprecationWarning',
46+
"ignore:CheckConstraint.check is deprecated in favor of `.condition`.:django.utils.deprecation.RemovedInDjango60Warning",
47+
]
48+
4349
addopts = [
4450
"-rfExXw",
4551
"--strict-markers",

vulnerabilities/importers/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
from vulnerabilities.pipelines.v2_importers import (
5555
elixir_security_importer as elixir_security_importer_v2,
5656
)
57+
from vulnerabilities.pipelines.v2_importers import enisa_nisa_importer as enisa_nisa_importer_v2
5758
from vulnerabilities.pipelines.v2_importers import epss_importer_v2
5859
from vulnerabilities.pipelines.v2_importers import fireeye_importer_v2
5960
from vulnerabilities.pipelines.v2_importers import gentoo_importer as gentoo_importer_v2
@@ -111,6 +112,7 @@
111112
ruby_importer_v2.RubyImporterPipeline,
112113
epss_importer_v2.EPSSImporterPipeline,
113114
cloudvulndb_importer_v2.CloudVulnDBImporterPipeline,
115+
enisa_nisa_importer_v2.EnisaNisaImporterPipeline,
114116
gentoo_importer_v2.GentooImporterPipeline,
115117
nginx_importer_v2.NginxImporterPipeline,
116118
debian_importer_v2.DebianImporterPipeline,

vulnerabilities/pipelines/v2_importers/cloudvulndb_importer.py

Lines changed: 204 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,19 @@
1010
import hashlib
1111
import json
1212
import logging
13+
from pathlib import Path
1314
from typing import Iterable
1415
from urllib.parse import urlparse
1516
from xml.etree import ElementTree
1617

1718
from dateutil import parser as dateutil_parser
19+
from fetchcode.vcs import fetch_via_vcs
20+
import saneyaml
1821

1922
from vulnerabilities.importer import AdvisoryDataV2
2023
from vulnerabilities.importer import ReferenceV2
2124
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
25+
from vulnerabilities.utils import get_advisory_url
2226
from vulnerabilities.utils import fetch_response
2327
from vulnerabilities.utils import find_all_cve
2428

@@ -28,7 +32,7 @@
2832

2933

3034
class CloudVulnDBImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
31-
"""Collect cloud vulnerabilities from the public CloudVulnDB RSS feed."""
35+
"""Collect cloud vulnerabilities from CloudVulnDB structured data files."""
3236

3337
pipeline_id = "cloudvulndb_importer_v2"
3438
spdx_license_expression = "CC-BY-4.0"
@@ -40,7 +44,57 @@ class CloudVulnDBImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
4044

4145
@classmethod
4246
def steps(cls):
43-
return (cls.collect_and_store_advisories,)
47+
return (
48+
cls.clone,
49+
cls.collect_and_store_advisories,
50+
cls.clean_downloads,
51+
)
52+
53+
def clone(self):
54+
self.log(f"Cloning `{self.repo_url}`")
55+
self.vcs_response = fetch_via_vcs(self.repo_url)
56+
57+
def clean_downloads(self):
58+
if self.vcs_response:
59+
self.log("Removing cloned repository")
60+
self.vcs_response.delete()
61+
62+
def on_failure(self):
63+
self.clean_downloads()
64+
65+
def _iter_structured_files(self):
66+
base_directory = Path(self.vcs_response.dest_dir)
67+
68+
for file_path in base_directory.rglob("*"):
69+
if not file_path.is_file():
70+
continue
71+
72+
suffix = file_path.suffix.lower()
73+
if suffix not in (".json", ".yaml", ".yml"):
74+
continue
75+
76+
yield file_path
77+
78+
def _load_file_items(self, file_path: Path):
79+
text = file_path.read_text(encoding="utf-8", errors="replace")
80+
suffix = file_path.suffix.lower()
81+
82+
if suffix == ".json":
83+
data = json.loads(text)
84+
else:
85+
data = saneyaml.load(text)
86+
87+
if isinstance(data, list):
88+
return data
89+
90+
if isinstance(data, dict):
91+
for key in ("vulnerabilities", "advisories", "items", "data"):
92+
nested = data.get(key)
93+
if isinstance(nested, list):
94+
return nested
95+
return [data]
96+
97+
return []
4498

4599
def get_feed_items(self):
46100
if self._cached_items is None:
@@ -49,15 +103,157 @@ def get_feed_items(self):
49103
return self._cached_items
50104

51105
def advisories_count(self) -> int:
106+
count = 0
107+
for file_path in self._iter_structured_files():
108+
try:
109+
count += len(self._load_file_items(file_path))
110+
except Exception:
111+
continue
112+
113+
if count:
114+
return count
115+
52116
return len(self.get_feed_items())
53117

54118
def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
119+
base_directory = Path(self.vcs_response.dest_dir)
120+
structured_count = 0
121+
122+
for file_path in self._iter_structured_files():
123+
try:
124+
items = self._load_file_items(file_path)
125+
except Exception as e:
126+
self.log(
127+
f"Failed to parse structured file {file_path}: {e}",
128+
level=logging.WARNING,
129+
)
130+
continue
131+
132+
if not items:
133+
continue
134+
135+
advisory_url = get_advisory_url(
136+
file=file_path,
137+
base_path=base_directory,
138+
url="https://github.com/wiz-sec/open-cvdb/blob/main/",
139+
)
140+
141+
for item in items:
142+
advisory = parse_structured_advisory_data(item=item, advisory_url=advisory_url)
143+
if advisory:
144+
structured_count += 1
145+
yield advisory
146+
147+
if structured_count:
148+
return
149+
150+
self.log("No structured YAML/JSON advisories found, falling back to RSS feed")
55151
for item in self.get_feed_items():
56-
advisory = parse_advisory_data(item)
152+
advisory = parse_rss_advisory_data(item)
57153
if advisory:
58154
yield advisory
59155

60156

157+
def parse_structured_advisory_data(item: dict, advisory_url: str):
158+
"""
159+
Parse one structured advisory object from YAML/JSON.
160+
161+
This parser is intentionally tolerant and can emit advisories without packages,
162+
which is required for SaaS advisories where a PURL may not exist yet.
163+
"""
164+
if not isinstance(item, dict):
165+
return None
166+
167+
advisory_id = (
168+
item.get("id")
169+
or item.get("advisory_id")
170+
or item.get("uid")
171+
or item.get("slug")
172+
or item.get("name")
173+
or ""
174+
)
175+
advisory_id = str(advisory_id).strip()
176+
177+
title = str(item.get("title") or item.get("summary") or "").strip()
178+
description = str(item.get("description") or item.get("details") or "").strip()
179+
180+
date_value = item.get("published") or item.get("published_at") or item.get("date")
181+
date_published = None
182+
if date_value:
183+
try:
184+
date_published = dateutil_parser.parse(str(date_value))
185+
except Exception:
186+
date_published = None
187+
188+
aliases = []
189+
alias_candidates = item.get("aliases")
190+
if isinstance(alias_candidates, list):
191+
for alias in alias_candidates:
192+
alias_text = str(alias).strip()
193+
if alias_text:
194+
aliases.extend(find_all_cve(alias_text) or [alias_text])
195+
196+
for key in ("cve", "cve_id", "cve_ids"):
197+
value = item.get(key)
198+
if isinstance(value, str):
199+
aliases.extend(find_all_cve(value))
200+
elif isinstance(value, list):
201+
for entry in value:
202+
aliases.extend(find_all_cve(str(entry)))
203+
204+
# Structured records often only mentio CVEs in free text fields.
205+
aliases.extend(find_all_cve(description))
206+
aliases.extend(find_all_cve(title))
207+
208+
aliases = list(dict.fromkeys([a for a in aliases if a]))
209+
210+
if not advisory_id:
211+
advisory_id = get_advisory_id(
212+
guid="",
213+
link=advisory_url,
214+
title=title,
215+
pub_date=str(date_value or ""),
216+
)
217+
218+
if not advisory_id:
219+
return None
220+
221+
references = []
222+
reference_urls = []
223+
refs = item.get("references")
224+
if isinstance(refs, list):
225+
for ref in refs:
226+
if isinstance(ref, str):
227+
reference_urls.append(ref)
228+
continue
229+
230+
if isinstance(ref, dict):
231+
for key in ("url", "href", "link"):
232+
if ref.get(key):
233+
reference_urls.append(str(ref.get(key)))
234+
break
235+
236+
source_url = item.get("url") or item.get("source") or advisory_url
237+
if source_url:
238+
reference_urls.append(str(source_url))
239+
240+
for url in list(dict.fromkeys([u.strip() for u in reference_urls if str(u).strip()])):
241+
references.append(ReferenceV2(url=url))
242+
243+
summary = title or description or advisory_id
244+
245+
return AdvisoryDataV2(
246+
advisory_id=advisory_id,
247+
aliases=[alias for alias in aliases if alias != advisory_id],
248+
summary=summary,
249+
affected_packages=[],
250+
references=references,
251+
date_published=date_published,
252+
url=advisory_url,
253+
original_advisory_text=json.dumps(item, indent=2, ensure_ascii=False),
254+
)
255+
256+
61257
def parse_rss_feed(xml_text: str) -> list:
62258
"""
63259
Parse CloudVulnDB RSS XML and return a list of item dictionaries.
@@ -89,7 +285,7 @@ def parse_rss_feed(xml_text: str) -> list:
89285
return items
90286

91287

92-
def parse_advisory_data(item: dict):
288+
def parse_rss_advisory_data(item: dict):
93289
"""
94290
Parse one CloudVulnDB item and return an AdvisoryDataV2 object.
95291
Since the RSS feed does not provide package/version coordinates, ``affected_packages`` is empty.
@@ -133,6 +329,10 @@ def parse_advisory_data(item: dict):
133329
)
134330

135331

332+
# Backward-compatible alias used by existing tests/imports.
333+
parse_advisory_data = parse_rss_advisory_data
334+
335+
136336
def get_advisory_id(guid: str, link: str, title: str, pub_date: str) -> str:
137337
"""
138338
Return a stable advisory identifier using the best available source.

0 commit comments

Comments
 (0)