From 0dfd93e3ffc50ace56971924468bb08dac468304 Mon Sep 17 00:00:00 2001 From: Michael Ehab Mikhail Date: Mon, 18 Aug 2025 17:54:52 +0300 Subject: [PATCH 1/2] Add NPM Live V2 Importer Pipeline #1936 * Add NPM Live V2 Importer * Add tests for the NPM Live V2 Importer * Tested functionally using the Live Evaluation API in #1969 Signed-off-by: Michael Ehab Mikhail --- vulnerabilities/importers/__init__.py | 7 + .../v2_importers/npm_live_importer.py | 88 ++++++++++++ .../test_npm_live_importer_pipeline_v2.py | 133 ++++++++++++++++++ 3 files changed, 228 insertions(+) create mode 100644 vulnerabilities/pipelines/v2_importers/npm_live_importer.py create mode 100644 vulnerabilities/tests/pipelines/v2_importers/test_npm_live_importer_pipeline_v2.py diff --git a/vulnerabilities/importers/__init__.py b/vulnerabilities/importers/__init__.py index c0cf04ed7..5839d2e20 100644 --- a/vulnerabilities/importers/__init__.py +++ b/vulnerabilities/importers/__init__.py @@ -62,6 +62,7 @@ from vulnerabilities.pipelines.v2_importers import mozilla_importer as mozilla_importer_v2 from vulnerabilities.pipelines.v2_importers import nginx_importer as nginx_importer_v2 from vulnerabilities.pipelines.v2_importers import npm_importer as npm_importer_v2 +from vulnerabilities.pipelines.v2_importers import npm_live_importer as npm_live_importer_v2 from vulnerabilities.pipelines.v2_importers import nvd_importer as nvd_importer_v2 from vulnerabilities.pipelines.v2_importers import openssl_importer as openssl_importer_v2 from vulnerabilities.pipelines.v2_importers import oss_fuzz as oss_fuzz_v2 @@ -196,3 +197,9 @@ for key, value in IMPORTERS_REGISTRY.items() if issubclass(value, VulnerableCodeBaseImporterPipelineV2) and value.exclude_from_package_todo ] + +LIVE_IMPORTERS_REGISTRY = create_registry( + [ + npm_live_importer_v2.NpmLiveImporterPipeline, + ] +) diff --git a/vulnerabilities/pipelines/v2_importers/npm_live_importer.py b/vulnerabilities/pipelines/v2_importers/npm_live_importer.py new file mode 100644 index 000000000..4e6b6b8cc --- /dev/null +++ b/vulnerabilities/pipelines/v2_importers/npm_live_importer.py @@ -0,0 +1,88 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +from pathlib import Path +from typing import Iterable + +from packageurl import PackageURL +from univers.versions import SemverVersion + +from vulnerabilities.importer import AdvisoryData +from vulnerabilities.pipelines.v2_importers.npm_importer import NpmImporterPipeline +from vulnerabilities.utils import load_json + + +class NpmLiveImporterPipeline(NpmImporterPipeline): + """ + Node.js Security Working Group importer pipeline + + Import advisories from nodejs security working group including node proper advisories and npm advisories for a single PURL. + """ + + pipeline_id = "nodejs_security_wg_live_importer" + supported_types = ["npm"] + + @classmethod + def steps(cls): + return ( + cls.get_purl_inputs, + cls.clone, + cls.collect_and_store_advisories, + cls.clean_downloads, + ) + + def get_purl_inputs(self): + purl = self.inputs["purl"] + if not purl: + raise ValueError("PURL is required for NpmLiveImporterPipeline") + + if isinstance(purl, str): + purl = PackageURL.from_string(purl) + + if not isinstance(purl, PackageURL): + raise ValueError(f"Object of type {type(purl)} {purl!r} is not a PackageURL instance") + + if purl.type not in self.supported_types: + raise ValueError( + f"PURL: {purl!s} is not among the supported package types {self.supported_types!r}" + ) + + if not purl.version: + raise ValueError(f"PURL: {purl!s} is expected to have a version") + + self.purl = purl + + def collect_advisories(self) -> Iterable[AdvisoryData]: + vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" + advisory_files = list(vuln_directory.glob("*.json")) + + package_name = self.purl.name + filtered_files = [] + for advisory_file in advisory_files: + try: + data = load_json(advisory_file) + if data.get("module_name") == package_name: + affected_package = self.get_affected_package(data, package_name) + if not self.purl.version or self._version_is_affected(affected_package): + filtered_files.append(advisory_file) + except Exception as e: + self.log(f"Error processing advisory file {advisory_file}: {str(e)}") + advisory_files = filtered_files + + for advisory in list(advisory_files): + result = self.to_advisory_data(advisory) + if result: + yield result + + def _version_is_affected(self, affected_package): + if not self.purl.version or not affected_package.affected_version_range: + return True + + purl_version = SemverVersion(self.purl.version) + return purl_version in affected_package.affected_version_range diff --git a/vulnerabilities/tests/pipelines/v2_importers/test_npm_live_importer_pipeline_v2.py b/vulnerabilities/tests/pipelines/v2_importers/test_npm_live_importer_pipeline_v2.py new file mode 100644 index 000000000..1c14ec567 --- /dev/null +++ b/vulnerabilities/tests/pipelines/v2_importers/test_npm_live_importer_pipeline_v2.py @@ -0,0 +1,133 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import json +import os +from pathlib import Path +from types import SimpleNamespace + +import pytest +from packageurl import PackageURL +from univers.version_constraint import VersionConstraint +from univers.version_range import NpmVersionRange +from univers.versions import SemverVersion + +from vulnerabilities.importer import AffectedPackageV2 +from vulnerabilities.pipelines.v2_importers.npm_live_importer import NpmLiveImporterPipeline + +TEST_DATA = Path(__file__).parent.parent.parent / "test_data" / "npm" + + +def test_package_first_mode_valid_npm_package(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) + + npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") + with open(npm_sample_file) as f: + sample_data = json.load(f) + + advisory_file = vuln_dir / "152.json" + advisory_file.write_text(json.dumps(sample_data)) + + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) + + purl = PackageURL(type="npm", name="npm", version="1.2.0") + pipeline = NpmLiveImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response + + pipeline.get_purl_inputs() + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 1 + assert advisories[0].aliases == ["CVE-2013-4116"] + assert len(advisories[0].affected_packages) == 1 + assert advisories[0].affected_packages[0].package.name == "npm" + + +def test_package_first_mode_unaffected_version(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) + + npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") + with open(npm_sample_file) as f: + sample_data = json.load(f) + + advisory_file = vuln_dir / "152.json" + advisory_file.write_text(json.dumps(sample_data)) + + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) + + purl = PackageURL(type="npm", name="npm", version="1.4.0") + pipeline = NpmLiveImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response + + pipeline.get_purl_inputs() + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 0 + + +def test_package_first_mode_invalid_package_type(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) + + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) + + purl = PackageURL(type="pypi", name="django", version="3.0.0") + pipeline = NpmLiveImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response + + with pytest.raises(ValueError): + pipeline.get_purl_inputs() + + +def test_package_first_mode_package_not_found(tmp_path): + vuln_dir = tmp_path / "vuln" / "npm" + vuln_dir.mkdir(parents=True) + + npm_sample_file = os.path.join(TEST_DATA, "npm_sample.json") + with open(npm_sample_file) as f: + sample_data = json.load(f) + + sample_data["module_name"] = "some-other-package" + + advisory_file = vuln_dir / "152.json" + advisory_file.write_text(json.dumps(sample_data)) + + mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) + + purl = PackageURL(type="npm", name="nonexistent-package", version="1.0.0") + pipeline = NpmLiveImporterPipeline(purl=purl) + pipeline.vcs_response = mock_vcs_response + + pipeline.get_purl_inputs() + advisories = list(pipeline.collect_advisories()) + + assert len(advisories) == 0 + + +def test_version_is_affected(): + purl = PackageURL(type="npm", name="npm", version="1.2.0") + pipeline = NpmLiveImporterPipeline(purl=purl) + pipeline.get_purl_inputs() + + affected_package = AffectedPackageV2( + package=PackageURL(type="npm", name="npm"), + affected_version_range=NpmVersionRange( + constraints=(VersionConstraint(comparator="<", version=SemverVersion(string="1.3.3")),) + ), + ) + + assert pipeline._version_is_affected(affected_package) == True + + pipeline.purl = PackageURL(type="npm", name="npm", version="1.4.0") + assert pipeline._version_is_affected(affected_package) == False + + pipeline.purl = PackageURL(type="npm", name="npm") + assert pipeline._version_is_affected(affected_package) == True From 368a9ec0c52d331b47d62f0e8ef3e01185117dde Mon Sep 17 00:00:00 2001 From: ziad hany Date: Wed, 13 May 2026 14:37:01 +0300 Subject: [PATCH 2/2] Refactor the code for npm_live importer Signed-off-by: ziad hany --- .../v2_importers/npm_live_importer.py | 47 ++++++++++--------- .../test_npm_live_importer_pipeline_v2.py | 20 ++++---- .../tests/test_data/npm_live/npm_sample.json | 24 ++++++++++ .../npm_live/parse-advisory-npm-expected.json | 40 ++++++++++++++++ 4 files changed, 98 insertions(+), 33 deletions(-) create mode 100644 vulnerabilities/tests/test_data/npm_live/npm_sample.json create mode 100644 vulnerabilities/tests/test_data/npm_live/parse-advisory-npm-expected.json diff --git a/vulnerabilities/pipelines/v2_importers/npm_live_importer.py b/vulnerabilities/pipelines/v2_importers/npm_live_importer.py index 4e6b6b8cc..771d4e3ad 100644 --- a/vulnerabilities/pipelines/v2_importers/npm_live_importer.py +++ b/vulnerabilities/pipelines/v2_importers/npm_live_importer.py @@ -11,9 +11,10 @@ from typing import Iterable from packageurl import PackageURL +from univers.versions import InvalidVersion from univers.versions import SemverVersion -from vulnerabilities.importer import AdvisoryData +from vulnerabilities.importer import AdvisoryDataV2 from vulnerabilities.pipelines.v2_importers.npm_importer import NpmImporterPipeline from vulnerabilities.utils import load_json @@ -58,31 +59,35 @@ def get_purl_inputs(self): self.purl = purl - def collect_advisories(self) -> Iterable[AdvisoryData]: + def collect_advisories(self) -> Iterable[AdvisoryDataV2]: vuln_directory = Path(self.vcs_response.dest_dir) / "vuln" / "npm" - advisory_files = list(vuln_directory.glob("*.json")) - package_name = self.purl.name filtered_files = [] - for advisory_file in advisory_files: - try: - data = load_json(advisory_file) - if data.get("module_name") == package_name: - affected_package = self.get_affected_package(data, package_name) - if not self.purl.version or self._version_is_affected(affected_package): - filtered_files.append(advisory_file) - except Exception as e: - self.log(f"Error processing advisory file {advisory_file}: {str(e)}") - advisory_files = filtered_files - - for advisory in list(advisory_files): + for advisory_file in vuln_directory.glob("*.json"): + data = load_json(advisory_file) + if data.get("module_name") == package_name: + affected_package = self.get_affected_package(data, package_name) + if not self.purl.version or self._version_is_related(affected_package): + filtered_files.append(advisory_file) + + for advisory in filtered_files: result = self.to_advisory_data(advisory) if result: yield result - def _version_is_affected(self, affected_package): - if not self.purl.version or not affected_package.affected_version_range: + def _version_is_related(self, affected_package): + try: + package_version = SemverVersion(self.purl.version) + except InvalidVersion as e: + self.log(f"Invalid PURL version: {self.purl.version!r}: {str(e)}") + return False + + if ( + affected_package.affected_version_range + and package_version in affected_package.affected_version_range + ) or ( + affected_package.fixed_version_range + and package_version in affected_package.fixed_version_range + ): return True - - purl_version = SemverVersion(self.purl.version) - return purl_version in affected_package.affected_version_range + return False diff --git a/vulnerabilities/tests/pipelines/v2_importers/test_npm_live_importer_pipeline_v2.py b/vulnerabilities/tests/pipelines/v2_importers/test_npm_live_importer_pipeline_v2.py index 1c14ec567..e68d1833c 100644 --- a/vulnerabilities/tests/pipelines/v2_importers/test_npm_live_importer_pipeline_v2.py +++ b/vulnerabilities/tests/pipelines/v2_importers/test_npm_live_importer_pipeline_v2.py @@ -20,8 +20,9 @@ from vulnerabilities.importer import AffectedPackageV2 from vulnerabilities.pipelines.v2_importers.npm_live_importer import NpmLiveImporterPipeline +from vulnerabilities.tests import util_tests -TEST_DATA = Path(__file__).parent.parent.parent / "test_data" / "npm" +TEST_DATA = Path(__file__).parent.parent.parent / "test_data" / "npm_live" def test_package_first_mode_valid_npm_package(tmp_path): @@ -37,17 +38,15 @@ def test_package_first_mode_valid_npm_package(tmp_path): mock_vcs_response = SimpleNamespace(dest_dir=str(tmp_path), delete=lambda: None) - purl = PackageURL(type="npm", name="npm", version="1.2.0") + purl = PackageURL(type="npm", name="decamelize", version="1.1.0") pipeline = NpmLiveImporterPipeline(purl=purl) pipeline.vcs_response = mock_vcs_response pipeline.get_purl_inputs() - advisories = list(pipeline.collect_advisories()) - assert len(advisories) == 1 - assert advisories[0].aliases == ["CVE-2013-4116"] - assert len(advisories[0].affected_packages) == 1 - assert advisories[0].affected_packages[0].package.name == "npm" + result = [adv.to_dict() for adv in pipeline.collect_advisories()] + expected_file = Path(TEST_DATA / "parse-advisory-npm-expected.json") + util_tests.check_results_against_json(result, expected_file) def test_package_first_mode_unaffected_version(tmp_path): @@ -124,10 +123,7 @@ def test_version_is_affected(): ), ) - assert pipeline._version_is_affected(affected_package) == True + assert pipeline._version_is_related(affected_package) == True pipeline.purl = PackageURL(type="npm", name="npm", version="1.4.0") - assert pipeline._version_is_affected(affected_package) == False - - pipeline.purl = PackageURL(type="npm", name="npm") - assert pipeline._version_is_affected(affected_package) == True + assert pipeline._version_is_related(affected_package) == False diff --git a/vulnerabilities/tests/test_data/npm_live/npm_sample.json b/vulnerabilities/tests/test_data/npm_live/npm_sample.json new file mode 100644 index 000000000..f73bf9162 --- /dev/null +++ b/vulnerabilities/tests/test_data/npm_live/npm_sample.json @@ -0,0 +1,24 @@ +{ + "id": 308, + "created_at": "2017-01-26", + "updated_at": "2017-04-14", + "title": "Regular Expression Denial of Service", + "author": { + "name": "saurik", + "website": null, + "username": null + }, + "module_name": "decamelize", + "publish_date": "2017-04-14", + "cves": [], + "vulnerable_versions": ">=1.1.0 <=1.1.1", + "patched_versions": ">=1.1.2", + "overview": "Decamelize is used to convert a dash/dot/underscore/space separated string to camelCase. \n\nDecamelize uses regular expressions to evaluate a string and takes unescaped separator values, which can be used to create a denial of service attack.", + "recommendation": "Upgrade to version 1.1.2 or later.", + "references": [ + "https://github.com/sindresorhus/decamelize/issues/5" + ], + "cvss_vector": null, + "cvss_score": null, + "coordinating_vendor": "^Lift Security" +} \ No newline at end of file diff --git a/vulnerabilities/tests/test_data/npm_live/parse-advisory-npm-expected.json b/vulnerabilities/tests/test_data/npm_live/parse-advisory-npm-expected.json new file mode 100644 index 000000000..306148f80 --- /dev/null +++ b/vulnerabilities/tests/test_data/npm_live/parse-advisory-npm-expected.json @@ -0,0 +1,40 @@ +[ + { + "advisory_id": "npm-308", + "aliases": [], + "summary": "Regular Expression Denial of Service\nDecamelize is used to convert a dash/dot/underscore/space separated string to camelCase. \n\nDecamelize uses regular expressions to evaluate a string and takes unescaped separator values, which can be used to create a denial of service attack.", + "affected_packages": [ + { + "package": { + "type": "npm", + "namespace": "", + "name": "decamelize", + "version": "", + "qualifiers": "", + "subpath": "" + }, + "affected_version_range": "vers:npm/>=1.1.0|<=1.1.1", + "fixed_version_range": "vers:npm/>=1.1.2", + "introduced_by_commit_patches": [], + "fixed_by_commit_patches": [] + } + ], + "references": [ + { + "reference_id": "", + "reference_type": "", + "url": "https://github.com/sindresorhus/decamelize/issues/5" + }, + { + "reference_id": "308", + "reference_type": "", + "url": "https://github.com/nodejs/security-wg/blob/main/vuln/npm/308.json" + } + ], + "patches": [], + "severities": [], + "date_published": "2017-01-26T00:00:00+00:00", + "weaknesses": [], + "url": "https://github.com/nodejs/security-wg/blob/main/vuln/npm/308.json" + } +] \ No newline at end of file