diff --git a/pyproject.toml b/pyproject.toml index 3b5645499..d5b7fc1c1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,10 +40,21 @@ python_files = "*.py" python_classes = "Test" python_functions = "test" +filterwarnings = [ + "ignore:Unknown config option.*:pytest.PytestConfigWarning", + 'ignore:datetime.datetime.utcfromtimestamp\(\) is deprecated and scheduled for removal.*:DeprecationWarning', + "ignore:CheckConstraint.check is deprecated in favor of `.condition`.:django.utils.deprecation.RemovedInDjango60Warning", +] + addopts = [ "-rfExXw", "--strict-markers", "--doctest-modules", + # setup.py imports setuptools which is not available in the Docker runtime + # image. Without this, pytest (which uses python_files = "*.py") tries to + # collect setup.py as a test module and crashes with exit code 2. + "--ignore=setup.py", + "--ignore-glob=*/setup.py", # Ignore the following doctests until these files are migrated to # import-improve structure "--ignore=vulnerabilities/importers/apache_httpd.py", diff --git a/vulnerabilities/importers/__init__.py b/vulnerabilities/importers/__init__.py index dd2c504ce..91bccd9e2 100644 --- a/vulnerabilities/importers/__init__.py +++ b/vulnerabilities/importers/__init__.py @@ -45,12 +45,14 @@ from vulnerabilities.pipelines.v2_importers import apache_kafka_importer as apache_kafka_importer_v2 from vulnerabilities.pipelines.v2_importers import apache_tomcat_importer as apache_tomcat_v2 from vulnerabilities.pipelines.v2_importers import archlinux_importer as archlinux_importer_v2 +from vulnerabilities.pipelines.v2_importers import cloudvulndb_importer as cloudvulndb_importer_v2 from vulnerabilities.pipelines.v2_importers import collect_fix_commits as collect_fix_commits_v2 from vulnerabilities.pipelines.v2_importers import curl_importer as curl_importer_v2 from vulnerabilities.pipelines.v2_importers import debian_importer as debian_importer_v2 from vulnerabilities.pipelines.v2_importers import ( elixir_security_importer as elixir_security_importer_v2, ) +from vulnerabilities.pipelines.v2_importers import enisa_nisa_importer as enisa_nisa_importer_v2 from vulnerabilities.pipelines.v2_importers import epss_importer_v2 from vulnerabilities.pipelines.v2_importers import fireeye_importer_v2 from vulnerabilities.pipelines.v2_importers import gentoo_importer as gentoo_importer_v2 @@ -107,6 +109,8 @@ project_kb_msr2019_importer_v2.ProjectKBMSR2019Pipeline, ruby_importer_v2.RubyImporterPipeline, epss_importer_v2.EPSSImporterPipeline, + cloudvulndb_importer_v2.CloudVulnDBImporterPipeline, + enisa_nisa_importer_v2.EnisaNisaImporterPipeline, gentoo_importer_v2.GentooImporterPipeline, nginx_importer_v2.NginxImporterPipeline, debian_importer_v2.DebianImporterPipeline, diff --git a/vulnerabilities/pipelines/v2_importers/cloudvulndb_importer.py b/vulnerabilities/pipelines/v2_importers/cloudvulndb_importer.py new file mode 100644 index 000000000..90e3f2c12 --- /dev/null +++ b/vulnerabilities/pipelines/v2_importers/cloudvulndb_importer.py @@ -0,0 +1,371 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import hashlib +import json +import logging +from pathlib import Path +from typing import Iterable +from urllib.parse import urlparse +from xml.etree import ElementTree + +from dateutil import parser as dateutil_parser +from fetchcode.vcs import fetch_via_vcs +import saneyaml + +from vulnerabilities.importer import AdvisoryDataV2 +from vulnerabilities.importer import ReferenceV2 +from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2 +from vulnerabilities.utils import get_advisory_url +from vulnerabilities.utils import fetch_response +from vulnerabilities.utils import find_all_cve + +logger = logging.getLogger(__name__) + +CLOUDVULNDB_RSS_URL = "https://www.cloudvulndb.org/rss/feed.xml" + + +class CloudVulnDBImporterPipeline(VulnerableCodeBaseImporterPipelineV2): + """Collect cloud vulnerabilities from CloudVulnDB structured data files.""" + + pipeline_id = "cloudvulndb_importer_v2" + spdx_license_expression = "CC-BY-4.0" + license_url = "https://github.com/wiz-sec/open-cvdb/blob/main/LICENSE.md" + repo_url = "https://github.com/wiz-sec/open-cvdb" + precedence = 200 + + _cached_items = None + + @classmethod + def steps(cls): + return ( + cls.clone, + cls.collect_and_store_advisories, + cls.clean_downloads, + ) + + def clone(self): + self.log(f"Cloning `{self.repo_url}`") + self.vcs_response = fetch_via_vcs(self.repo_url) + + def clean_downloads(self): + if self.vcs_response: + self.log("Removing cloned repository") + self.vcs_response.delete() + + def on_failure(self): + self.clean_downloads() + + def _iter_structured_files(self): + base_directory = Path(self.vcs_response.dest_dir) + + for file_path in base_directory.rglob("*"): + if not file_path.is_file(): + continue + + suffix = file_path.suffix.lower() + if suffix not in (".json", ".yaml", ".yml"): + continue + + yield file_path + + def _load_file_items(self, file_path: Path): + text = file_path.read_text(encoding="utf-8", errors="replace") + suffix = file_path.suffix.lower() + + if suffix == ".json": + data = json.loads(text) + else: + data = saneyaml.load(text) + + if isinstance(data, list): + return data + + if isinstance(data, dict): + for key in ("vulnerabilities", "advisories", "items", "data"): + nested = data.get(key) + if isinstance(nested, list): + return nested + return [data] + + return [] + + def get_feed_items(self): + if self._cached_items is None: + response = fetch_response(CLOUDVULNDB_RSS_URL) + self._cached_items = parse_rss_feed(response.text) + return self._cached_items + + def advisories_count(self) -> int: + count = 0 + for file_path in self._iter_structured_files(): + try: + count += len(self._load_file_items(file_path)) + except Exception: + continue + + if count: + return count + + return len(self.get_feed_items()) + + def collect_advisories(self) -> Iterable[AdvisoryDataV2]: + base_directory = Path(self.vcs_response.dest_dir) + structured_count = 0 + + for file_path in self._iter_structured_files(): + try: + items = self._load_file_items(file_path) + except Exception as e: + self.log( + f"Failed to parse structured file {file_path}: {e}", + level=logging.WARNING, + ) + continue + + if not items: + continue + + advisory_url = get_advisory_url( + file=file_path, + base_path=base_directory, + url="https://github.com/wiz-sec/open-cvdb/blob/main/", + ) + + for item in items: + advisory = parse_structured_advisory_data(item=item, advisory_url=advisory_url) + if advisory: + structured_count += 1 + yield advisory + + if structured_count: + return + + self.log("No structured YAML/JSON advisories found, falling back to RSS feed") + for item in self.get_feed_items(): + advisory = parse_rss_advisory_data(item) + if advisory: + yield advisory + + +def parse_structured_advisory_data(item: dict, advisory_url: str): + """ + Parse one structured advisory object from YAML/JSON. + + This parser is intentionally tolerant and can emit advisories without packages, + which is required for SaaS advisories where a PURL may not exist yet. + """ + if not isinstance(item, dict): + return None + + advisory_id = ( + item.get("id") + or item.get("advisory_id") + or item.get("uid") + or item.get("slug") + or item.get("name") + or "" + ) + advisory_id = str(advisory_id).strip() + + title = str(item.get("title") or item.get("summary") or "").strip() + description = str(item.get("description") or item.get("details") or "").strip() + + date_value = item.get("published") or item.get("published_at") or item.get("date") + date_published = None + if date_value: + try: + date_published = dateutil_parser.parse(str(date_value)) + except Exception: + date_published = None + + aliases = [] + alias_candidates = item.get("aliases") + if isinstance(alias_candidates, list): + for alias in alias_candidates: + alias_text = str(alias).strip() + if alias_text: + aliases.extend(find_all_cve(alias_text) or [alias_text]) + + for key in ("cve", "cve_id", "cve_ids"): + value = item.get(key) + if isinstance(value, str): + aliases.extend(find_all_cve(value)) + elif isinstance(value, list): + for entry in value: + aliases.extend(find_all_cve(str(entry))) + + # Structured records often only mentio CVEs in free text fields. + aliases.extend(find_all_cve(description)) + aliases.extend(find_all_cve(title)) + + aliases = list(dict.fromkeys([a for a in aliases if a])) + + if not advisory_id: + advisory_id = get_advisory_id( + guid="", + link=advisory_url, + title=title, + pub_date=str(date_value or ""), + ) + + if not advisory_id: + return None + + references = [] + reference_urls = [] + refs = item.get("references") + if isinstance(refs, list): + for ref in refs: + if isinstance(ref, str): + reference_urls.append(ref) + continue + + if isinstance(ref, dict): + for key in ("url", "href", "link"): + if ref.get(key): + reference_urls.append(str(ref.get(key))) + break + + source_url = item.get("url") or item.get("source") or advisory_url + if source_url: + reference_urls.append(str(source_url)) + + for url in list(dict.fromkeys([u.strip() for u in reference_urls if str(u).strip()])): + references.append(ReferenceV2(url=url)) + + summary = title or description or advisory_id + + return AdvisoryDataV2( + advisory_id=advisory_id, + aliases=[alias for alias in aliases if alias != advisory_id], + summary=summary, + affected_packages=[], + references=references, + date_published=date_published, + url=advisory_url, + original_advisory_text=json.dumps(item, indent=2, ensure_ascii=False), + ) + + +def parse_rss_feed(xml_text: str) -> list: + """ + Parse CloudVulnDB RSS XML and return a list of item dictionaries. + Each dictionary has ``title``, ``link``, ``description``, ``pub_date`` and ``guid`` keys. + """ + try: + root = ElementTree.fromstring(xml_text) + except ElementTree.ParseError as e: + logger.error("Failed to parse CloudVulnDB RSS XML: %s", e) + return [] + + channel = root.find("channel") + if channel is None: + logger.error("CloudVulnDB RSS feed has no element") + return [] + + items = [] + for item_el in channel.findall("item"): + items.append( + { + "title": (item_el.findtext("title") or "").strip(), + "link": (item_el.findtext("link") or "").strip(), + "description": (item_el.findtext("description") or "").strip(), + "pub_date": (item_el.findtext("pubDate") or "").strip(), + "guid": (item_el.findtext("guid") or "").strip(), + } + ) + + return items + + +def parse_rss_advisory_data(item: dict): + """ + Parse one CloudVulnDB item and return an AdvisoryDataV2 object. + Since the RSS feed does not provide package/version coordinates, ``affected_packages`` is empty. + """ + title = item.get("title") or "" + link = item.get("link") or "" + description = item.get("description") or "" + pub_date = item.get("pub_date") or "" + guid = item.get("guid") or "" + + advisory_id = get_advisory_id(guid=guid, link=link, title=title, pub_date=pub_date) + if not advisory_id: + logger.error("Skipping advisory with no usable identifier: %r", item) + return None + + aliases = list(dict.fromkeys(find_all_cve(f"{title}\n{description}"))) + aliases = [alias for alias in aliases if alias != advisory_id] + + date_published = None + if pub_date: + try: + date_published = dateutil_parser.parse(pub_date) + except Exception as e: + logger.warning("Could not parse date %r for advisory %s: %s", pub_date, advisory_id, e) + + references = [] + if link: + references.append(ReferenceV2(url=link)) + + summary = title or description + + return AdvisoryDataV2( + advisory_id=advisory_id, + aliases=aliases, + summary=summary, + affected_packages=[], + references=references, + date_published=date_published, + url=link or CLOUDVULNDB_RSS_URL, + original_advisory_text=json.dumps(item, indent=2, ensure_ascii=False), + ) + + +# Backward-compatible alias used by existing tests/imports. +parse_advisory_data = parse_rss_advisory_data + + +def get_advisory_id(guid: str, link: str, title: str, pub_date: str) -> str: + """ + Return a stable advisory identifier using the best available source. + Preference order is GUID, link slug, then deterministic content hash fallback. + """ + guid = (guid or "").strip() + if guid: + return guid + + slug = advisory_slug_from_link(link) + if slug: + return slug + + fingerprint_source = "|".join([title.strip(), pub_date.strip()]) + if not fingerprint_source.strip("|"): + return "" + + digest = hashlib.sha256(fingerprint_source.encode("utf-8")).hexdigest()[:16] + return f"cloudvulndb-{digest}" + + +def advisory_slug_from_link(link: str) -> str: + """Extract an advisory slug from a CloudVulnDB URL path.""" + if not link: + return "" + + try: + parsed = urlparse(link) + except Exception: + return "" + + parts = [part for part in parsed.path.split("/") if part] + if not parts: + return "" + + return parts[-1].strip() diff --git a/vulnerabilities/pipelines/v2_importers/enisa_nisa_importer.py b/vulnerabilities/pipelines/v2_importers/enisa_nisa_importer.py new file mode 100644 index 000000000..a7175fb24 --- /dev/null +++ b/vulnerabilities/pipelines/v2_importers/enisa_nisa_importer.py @@ -0,0 +1,187 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import json +from pathlib import Path +from typing import Iterable + +from fetchcode.vcs import fetch_via_vcs +import saneyaml + +from vulnerabilities.importer import AdvisoryDataV2 +from vulnerabilities.importer import ReferenceV2 +from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2 +from vulnerabilities.utils import find_all_cve +from vulnerabilities.utils import get_advisory_url + + +class EnisaNisaImporterPipeline(VulnerableCodeBaseImporterPipelineV2): + """ + Import ENISA NISA advisories with tolerant parsing. + + This parser is intentionally fault-tolerant: when version mapping is malformed, + it still extracts CVE aliases and URL references. + """ + + pipeline_id = "enisa_nisa_importer_v2" + spdx_license_expression = "CC-BY-4.0" + license_url = "https://www.enisa.europa.eu/" + repo_url = "git+https://github.com/enisaeu/CNW" + + precedence = 200 + + @classmethod + def steps(cls): + return ( + cls.clone, + cls.collect_and_store_advisories, + cls.clean_downloads, + ) + + def clone(self): + self.log(f"Cloning `{self.repo_url}`") + self.vcs_response = fetch_via_vcs(self.repo_url) + + def clean_downloads(self): + if self.vcs_response: + self.log("Removing cloned repository") + self.vcs_response.delete() + + def on_failure(self): + self.clean_downloads() + + def _iter_structured_files(self): + base_directory = Path(self.vcs_response.dest_dir) + for file_path in base_directory.rglob("*"): + if not file_path.is_file(): + continue + + suffix = file_path.suffix.lower() + if suffix not in (".json", ".yaml", ".yml"): + continue + + yield file_path + + def _load_items(self, file_path: Path): + text = file_path.read_text(encoding="utf-8", errors="replace") + suffix = file_path.suffix.lower() + + if suffix == ".json": + data = json.loads(text) + else: + data = saneyaml.load(text) + + if isinstance(data, list): + return data + + if isinstance(data, dict): + for key in ("advisories", "vulnerabilities", "items", "data"): + nested = data.get(key) + if isinstance(nested, list): + return nested + return [data] + + return [] + + def advisories_count(self): + count = 0 + for file_path in self._iter_structured_files(): + try: + count += len(self._load_items(file_path)) + except Exception: + continue + return count + + def collect_advisories(self) -> Iterable[AdvisoryDataV2]: + base_directory = Path(self.vcs_response.dest_dir) + + for file_path in self._iter_structured_files(): + try: + items = self._load_items(file_path) + except Exception as e: + self.log(f"Failed to parse {file_path}: {e}") + continue + + advisory_url = get_advisory_url( + file=file_path, + base_path=base_directory, + url="https://github.com/enisaeu/CNW/blob/main/", + ) + + for item in items: + advisory = parse_nisa_advisory(item=item, advisory_url=advisory_url) + if advisory: + yield advisory + + +def parse_nisa_advisory(item: dict, advisory_url: str): + """ + Parse one NISA advisory item. + + This parser is intentionally simple and resilient. If package/version fields are + malformed or unusable, we still emit an advisory with CVEs and references. + """ + if not isinstance(item, dict): + return None + + advisory_id = str(item.get("id") or item.get("advisory_id") or item.get("name") or "").strip() + + summary = str(item.get("summary") or item.get("title") or item.get("description") or "").strip() + + aliases = [] + for field in ("cve", "cve_id", "cve_ids", "aliases"): + value = item.get(field) + if isinstance(value, str): + aliases.extend(find_all_cve(value)) + elif isinstance(value, list): + for entry in value: + aliases.extend(find_all_cve(str(entry))) + + if isinstance(item.get("description"), str): + aliases.extend(find_all_cve(item.get("description"))) + + aliases = list(dict.fromkeys([a for a in aliases if a])) + + if not advisory_id and aliases: + advisory_id = aliases[0] + + if not advisory_id: + return None + + reference_urls = [] + refs = item.get("references") + + if isinstance(refs, list): + for ref in refs: + if isinstance(ref, str): + reference_urls.append(ref) + elif isinstance(ref, dict): + for key in ("url", "link", "href"): + if ref.get(key): + reference_urls.append(str(ref.get(key))) + break + + if item.get("url"): + reference_urls.append(str(item.get("url"))) + + reference_urls.append(advisory_url) + + references = [] + for url in list(dict.fromkeys([u.strip() for u in reference_urls if str(u).strip()])): + references.append(ReferenceV2(url=url)) + + return AdvisoryDataV2( + advisory_id=advisory_id, + aliases=[alias for alias in aliases if alias != advisory_id], + summary=summary or advisory_id, + affected_packages=[], + references=references, + url=advisory_url, + original_advisory_text=json.dumps(item, indent=2, ensure_ascii=False), + ) diff --git a/vulnerabilities/pipelines/v2_importers/redhat_importer.py b/vulnerabilities/pipelines/v2_importers/redhat_importer.py index 5dde4ce8f..b505b13f2 100644 --- a/vulnerabilities/pipelines/v2_importers/redhat_importer.py +++ b/vulnerabilities/pipelines/v2_importers/redhat_importer.py @@ -18,7 +18,6 @@ import dateparser import requests -from extractcode import ExtractError from packageurl import PackageURL from univers.version_range import RpmVersionRange from univers.version_range import VersionRange @@ -81,7 +80,7 @@ def fetch(self): f"Error while extracting archive {archive_path}: {errors}", level=logging.ERROR, ) - raise ExtractError(errors) + raise RuntimeError(errors) def advisories_count(self) -> int: return sum(1 for _ in self.location.rglob("*.json")) diff --git a/vulnerabilities/pipes/extractcode_utils.py b/vulnerabilities/pipes/extractcode_utils.py index 037564c30..aceb21369 100644 --- a/vulnerabilities/pipes/extractcode_utils.py +++ b/vulnerabilities/pipes/extractcode_utils.py @@ -7,14 +7,157 @@ # See https://aboutcode.org for more information about nexB OSS projects. # -from extractcode import api +import io +import os +import shutil +import subprocess +import tarfile +import zipfile +from pathlib import Path -def extract_archive(source, destination): - """Extract an archive at `source` to `destination`directory.""" - errors = {} - for event in api.extract_archive(source, destination): - if event.done and event.errors: - errors[str(event.source)] = event.errors +def _safe_destination_path(destination: Path, member_name: str) -> Path: + """ + Return a safe resolved destination path for ``member_name``. + + Raise ValueError if the path is absolute or escapes ``destination``. + """ + member_path = Path(member_name) + if member_path.is_absolute(): + raise ValueError(f"Unsafe absolute path in archive member: {member_name}") + + resolved = (destination / member_path).resolve() + destination_resolved = destination.resolve() + + if os.path.commonpath([str(resolved), str(destination_resolved)]) != str(destination_resolved): + raise ValueError(f"Path traversal attempt in archive member: {member_name}") + + return resolved + + +def _extract_tar_file(tar_file: tarfile.TarFile, destination: Path): + errors = [] + + for member in tar_file.getmembers(): + try: + target = _safe_destination_path(destination, member.name) + + if member.issym() or member.islnk(): + errors.append(f"Skipping symlink member: {member.name}") + continue + + if member.isdir(): + target.mkdir(parents=True, exist_ok=True) + continue + + parent = target.parent + parent.mkdir(parents=True, exist_ok=True) + + source = tar_file.extractfile(member) + if source is None: + continue + + with open(target, "wb") as out: + shutil.copyfileobj(source, out) + except Exception as e: + errors.append(f"Failed extracting TAR member '{member.name}': {e}") return errors + + +def _extract_zip_file(zip_file: zipfile.ZipFile, destination: Path): + errors = [] + + for member in zip_file.infolist(): + try: + target = _safe_destination_path(destination, member.filename) + + if member.is_dir(): + target.mkdir(parents=True, exist_ok=True) + continue + + parent = target.parent + parent.mkdir(parents=True, exist_ok=True) + + with zip_file.open(member, "r") as source, open(target, "wb") as out: + shutil.copyfileobj(source, out) + except Exception as e: + errors.append(f"Failed extracting ZIP member '{member.filename}': {e}") + + return errors + + +def _extract_tar_zst(source_path: Path, destination: Path): + errors = [] + + try: + import zstandard + + with open(source_path, "rb") as src: + dctx = zstandard.ZstdDecompressor() + with dctx.stream_reader(src) as reader: + tar_stream = io.BytesIO(reader.read()) + with tarfile.open(fileobj=tar_stream, mode="r:") as tar_file: + errors.extend(_extract_tar_file(tar_file, destination)) + return errors + except ImportError: + pass + except Exception as e: + errors.append(f"Python zstandard extraction failed: {e}") + + # Fallback to system zstd when Python zstandard is unavailable. + try: + zstd = shutil.which("zstd") + if not zstd: + errors.append("zstd command not found for .tar.zst extraction") + return errors + + result = subprocess.run( + [zstd, "-dc", str(source_path)], + check=False, + capture_output=True, + ) + if result.returncode != 0: + errors.append( + f"zstd extraction failed with code {result.returncode}: {result.stderr.decode('utf-8', errors='ignore')}" + ) + return errors + + tar_stream = io.BytesIO(result.stdout) + with tarfile.open(fileobj=tar_stream, mode="r:") as tar_file: + errors.extend(_extract_tar_file(tar_file, destination)) + except Exception as e: + errors.append(f"System zstd extraction failed: {e}") + + return errors + + +def extract_archive(source, destination): + """Extract ``source`` archive into ``destination`` using secure native extraction.""" + source_path = Path(source) + destination_path = Path(destination) + destination_path.mkdir(parents=True, exist_ok=True) + + errors = [] + + if str(source_path).endswith(".tar.zst"): + errors.extend(_extract_tar_zst(source_path, destination_path)) + elif tarfile.is_tarfile(source_path): + try: + with tarfile.open(source_path, "r:*") as tar_file: + errors.extend(_extract_tar_file(tar_file, destination_path)) + except Exception as e: + errors.append(f"Failed opening TAR archive '{source_path}': {e}") + elif zipfile.is_zipfile(source_path): + try: + with zipfile.ZipFile(source_path, "r") as zip_file: + errors.extend(_extract_zip_file(zip_file, destination_path)) + except Exception as e: + errors.append(f"Failed opening ZIP archive '{source_path}': {e}") + else: + errors.append(f"Unsupported archive format: {source_path}") + + if not errors: + return {} + + return {str(source_path): errors} diff --git a/vulnerabilities/tests/pipelines/v2_importers/test_enisa_nisa_importer.py b/vulnerabilities/tests/pipelines/v2_importers/test_enisa_nisa_importer.py new file mode 100644 index 000000000..8d41c874a --- /dev/null +++ b/vulnerabilities/tests/pipelines/v2_importers/test_enisa_nisa_importer.py @@ -0,0 +1,37 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import os + +import django + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "vulnerablecode.settings") +os.environ.setdefault("SECRET_KEY", "test-secret-key") +os.environ.setdefault("ALTCHA_HMAC_KEY", "0123456789abcdef0123456789abcdef") +django.setup() + +from vulnerabilities.pipelines.v2_importers.enisa_nisa_importer import parse_nisa_advisory + + +def test_parse_nisa_advisory_extracts_minimum_cve_and_references(): + raw = { + "title": "NISA bulletin", + "description": "Issue in component foo. CVE-2026-11111", + "references": [{"url": "https://example.com/nisa/bulletin"}], + } + + advisory = parse_nisa_advisory( + item=raw, + advisory_url="https://github.com/enisaeu/CNW/blob/main/data/nisa.yml", + ) + + assert advisory is not None + assert advisory.advisory_id == "CVE-2026-11111" + assert advisory.affected_packages == [] + assert advisory.references diff --git a/vulnerabilities/tests/pipes/test_extractcode_utils.py b/vulnerabilities/tests/pipes/test_extractcode_utils.py new file mode 100644 index 000000000..5ac639d6d --- /dev/null +++ b/vulnerabilities/tests/pipes/test_extractcode_utils.py @@ -0,0 +1,45 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import io +import tarfile +from pathlib import Path + +from vulnerabilities.pipes import extractcode_utils + + +def _build_tar_with_member(tar_path: Path, member_name: str, content: bytes): + with tarfile.open(tar_path, "w") as tar: + info = tarfile.TarInfo(name=member_name) + info.size = len(content) + tar.addfile(info, io.BytesIO(content)) + + +def test_extract_archive_blocks_path_traversal(tmp_path): + archive = tmp_path / "sample.tar" + output = tmp_path / "out" + + _build_tar_with_member(archive, "../../escape.txt", b"oops") + + errors = extractcode_utils.extract_archive(source=archive, destination=output) + + assert str(archive) in errors + assert not (tmp_path / "escape.txt").exists() + + +def test_extract_archive_extracts_safe_files(tmp_path): + archive = tmp_path / "safe.tar" + output = tmp_path / "out" + + _build_tar_with_member(archive, "nested/file.txt", b"ok") + + errors = extractcode_utils.extract_archive(source=archive, destination=output) + + assert errors == {} + assert (output / "nested" / "file.txt").read_bytes() == b"ok" diff --git a/vulnerabilities/tests/test_cloudvulndb_importer.py b/vulnerabilities/tests/test_cloudvulndb_importer.py new file mode 100644 index 000000000..76e7f59c9 --- /dev/null +++ b/vulnerabilities/tests/test_cloudvulndb_importer.py @@ -0,0 +1,91 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +import os +from unittest import TestCase + +import django + +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "vulnerablecode.settings") +os.environ.setdefault("SECRET_KEY", "test-secret-key") +os.environ.setdefault("ALTCHA_HMAC_KEY", "0123456789abcdef0123456789abcdef") +django.setup() + +from vulnerabilities.pipelines.v2_importers.cloudvulndb_importer import advisory_slug_from_link +from vulnerabilities.pipelines.v2_importers.cloudvulndb_importer import get_advisory_id +from vulnerabilities.pipelines.v2_importers.cloudvulndb_importer import parse_advisory_data +from vulnerabilities.pipelines.v2_importers.cloudvulndb_importer import parse_structured_advisory_data +from vulnerabilities.pipelines.v2_importers.cloudvulndb_importer import parse_rss_feed +from vulnerabilities.tests import util_tests + +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +TEST_DATA = os.path.join(BASE_DIR, "test_data/cloudvulndb") + + +def _load_rss(filename="cloudvulndb_rss_mock.xml"): + with open(os.path.join(TEST_DATA, filename), encoding="utf-8") as f: + return f.read() + + +class TestCloudVulnDBImporter(TestCase): + def test_parse_rss_feed_returns_correct_item_count(self): + items = parse_rss_feed(_load_rss()) + self.assertEqual(len(items), 2) + + def test_parse_advisory_with_guid_and_cves(self): + items = parse_rss_feed(_load_rss()) + result = parse_advisory_data(items[0]) + self.assertIsNotNone(result) + result_dict = result.to_dict() + expected_file = os.path.join(TEST_DATA, "expected_cloudvulndb_advisory_output1.json") + util_tests.check_results_against_json(result_dict, expected_file) + + def test_parse_advisory_without_guid_falls_back_to_link_slug(self): + items = parse_rss_feed(_load_rss()) + result = parse_advisory_data(items[1]) + self.assertIsNotNone(result) + self.assertEqual(result.advisory_id, "azure-imds-ssrf") + self.assertEqual(result.aliases, []) + + def test_get_advisory_id_hash_fallback(self): + advisory_id = get_advisory_id( + guid="", + link="", + title="Example advisory title", + pub_date="Mon, 08 Jul 2024 00:00:00 GMT", + ) + self.assertTrue(advisory_id.startswith("cloudvulndb-")) + self.assertEqual(len(advisory_id), len("cloudvulndb-") + 16) + + def test_parse_rss_feed_invalid_xml_returns_empty(self): + result = parse_rss_feed("not valid xml <>>>") + self.assertEqual(result, []) + + def test_advisory_slug_from_link(self): + slug = advisory_slug_from_link("https://www.cloudvulndb.org/vulnerabilities/aws-example/") + self.assertEqual(slug, "aws-example") + + def test_parse_structured_advisory_without_purl(self): + structured = { + "id": "CLOUD-2026-0001", + "title": "Azure Entra ID token validation issue", + "description": "Impacts Azure Entra ID service. CVE-2026-12345", + "references": [{"url": "https://example.com/cloud/advisory-1"}], + } + + advisory = parse_structured_advisory_data( + item=structured, + advisory_url="https://github.com/wiz-sec/open-cvdb/blob/main/advisories/sample.yaml", + ) + + self.assertIsNotNone(advisory) + self.assertEqual(advisory.advisory_id, "CLOUD-2026-0001") + self.assertIn("CVE-2026-12345", advisory.aliases) + self.assertEqual(advisory.affected_packages, []) + self.assertGreaterEqual(len(advisory.references), 1) diff --git a/vulnerabilities/tests/test_data/cloudvulndb/cloudvulndb_rss_mock.xml b/vulnerabilities/tests/test_data/cloudvulndb/cloudvulndb_rss_mock.xml new file mode 100644 index 000000000..1d2421e57 --- /dev/null +++ b/vulnerabilities/tests/test_data/cloudvulndb/cloudvulndb_rss_mock.xml @@ -0,0 +1,22 @@ + + + + CloudVulnDB RSS + https://www.cloudvulndb.org + Cloud vulnerabilities and security issues + + <![CDATA[AWS Example Privilege Escalation (CVE-2024-11111)]]> + https://www.cloudvulndb.org/vulnerabilities/aws-example-privilege-escalation + CLOUD-2024-0001 + Tue, 04 Jun 2024 12:30:00 GMT + + + + <![CDATA[Azure IMDS SSRF Exposure]]> + https://www.cloudvulndb.org/vulnerabilities/azure-imds-ssrf + + Fri, 05 Jul 2024 08:00:00 GMT + + + + diff --git a/vulnerabilities/tests/test_data/cloudvulndb/expected_cloudvulndb_advisory_output1.json b/vulnerabilities/tests/test_data/cloudvulndb/expected_cloudvulndb_advisory_output1.json new file mode 100644 index 000000000..8baf2b463 --- /dev/null +++ b/vulnerabilities/tests/test_data/cloudvulndb/expected_cloudvulndb_advisory_output1.json @@ -0,0 +1,21 @@ +{ + "advisory_id": "CLOUD-2024-0001", + "aliases": [ + "CVE-2024-11111", + "CVE-2024-22222" + ], + "summary": "AWS Example Privilege Escalation (CVE-2024-11111)", + "affected_packages": [], + "references": [ + { + "reference_id": "", + "reference_type": "", + "url": "https://www.cloudvulndb.org/vulnerabilities/aws-example-privilege-escalation" + } + ], + "patches": [], + "severities": [], + "date_published": "2024-06-04T12:30:00+00:00", + "weaknesses": [], + "url": "https://www.cloudvulndb.org/vulnerabilities/aws-example-privilege-escalation" +}