| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- #!/usr/bin/env python3
- # SPDX-License-Identifier: GPL-2.0-or-later
- #
- # Enriches the input CycloneDX SBOM with vulnerability information from the NVD
- # database.
- #
- # The NVD database is cloned using a mirror of it and the content is compared
- # locally.
- #
- # Example usage:
- # $ make show-info | utils/generate-cyclonedx | support/script/cve-check --nvd-path dl/buildroot-nvd/
- from collections import defaultdict
- from pathlib import Path
- from typing import TypedDict
- import argparse
- import sys
- import json
- import cve as cvecheck
- class Options(TypedDict, total=True):
- include_resolved: bool
- DESCRIPTION = """
- Enriches the input CycloneDX SBOM with vulnerability information from the NVD
- database.
- The NVD database is cloned using a mirror of it and the content is compared
- locally.
- """
- brpath = Path(__file__).parent.parent.parent
- def cve_api_get_lang_from_list(values, lang="en") -> (str | None):
- for x in values:
- if x.get("lang") == lang:
- return x.get("value")
- return None
- def nvd_cve_weaknesses_to_cdx(weaknesses) -> list[int]:
- """
- See the CycloneDX specification for 'cwes' [1]
- [1] https://cyclonedx.org/docs/1.6/json/#vulnerabilities_items_cwes
- """
- res = []
- for node in weaknesses:
- value = cve_api_get_lang_from_list(node.get("description", []))
- if value is None:
- continue
- cwe = value.replace("CWE-", "")
- if not cwe.isnumeric():
- continue
- res.append(int(cwe))
- return res
- def nvd_cve_cvss_to_cdx(metrics):
- """
- See the CycloneDX specification for 'ratings' [1]
- [1] https://cyclonedx.org/docs/1.6/json/#vulnerabilities_items_ratings
- """
- KEY_METHOD_DICT = {
- "cvssMetricV40": "CVSSv4",
- "cvssMetricV31": "CVSSv31",
- "cvssMetricV3": "CVSSv3",
- "cvssMetricV2": "CVSSv2"
- }
- res = []
- for key, values in metrics.items():
- for value in values:
- data = value.get("cvssData", {})
- res.append({
- "method": KEY_METHOD_DICT.get(key, "other"),
- **({
- "score": data["baseScore"],
- } if "baseScore" in data else {}),
- **({
- "severity": data["baseSeverity"].lower(),
- } if "baseSeverity" in data else {}),
- **({
- "vector": data["vectorString"],
- } if "vectorString" in data else {}),
- })
- return res
- def nvd_cve_references_to_cdx(references):
- advisories = []
- for ref in references:
- if not {"url", "tags"}.issubset(ref):
- continue
- tags = ref["tags"]
- if not isinstance(tags, list) or len(tags) == 0:
- continue
- advisories.append({
- "title": next((t for t in tags if "Advisory" not in t), tags[0]),
- "url": ref["url"]
- })
- return advisories
- def nvd_cve_to_cdx_vulnerability(nvd_cve):
- """
- Turns the CVE object fetched from the NVD API into a CycloneDX
- vulnerability that fits the spec (see [1]).
- [1] https://cyclonedx.org/docs/1.6/json/#vulnerabilities
- """
- vulnerability = {
- "bom-ref": nvd_cve["id"],
- "id": nvd_cve["id"],
- "description": cve_api_get_lang_from_list(nvd_cve.get("descriptions", [])) or "",
- "source": {
- "name": "NVD",
- "url": "https://nvd.nist.gov/"
- },
- **({
- "published": nvd_cve["published"],
- } if "published" in nvd_cve else {}),
- **({
- "updated": nvd_cve["lastModified"],
- } if "lastModified" in nvd_cve else {}),
- **({
- "cwes": nvd_cve_weaknesses_to_cdx(nvd_cve["weaknesses"]),
- } if "weaknesses" in nvd_cve else {}),
- **({
- "ratings": nvd_cve_cvss_to_cdx(nvd_cve["metrics"]),
- } if "metrics" in nvd_cve else {}),
- **({
- "advisories": nvd_cve_references_to_cdx(nvd_cve["references"]),
- } if "references" in nvd_cve else {}),
- }
- return vulnerability
- def vuln_append_or_update_affects_if_exists(vulnerabilities, vulnerability):
- """
- Append 'vulnerability' passed as argument to the 'vulnerabilities' argument
- if an entry with the same 'id' doesn't exist yet.
- If the vulnerability already exists, the input reference is added to the
- 'affects' list of the existing entry.
- Args:
- vulnerabilities (list): The vulnerabilities array reference retrieved
- from the input CycloneDX SBOM
- vulnerability (dict): Vulnerability to add to the 'vulnerabilities' list.
- """
- # Search if a vulnerability with the same identifier already exists in the
- # SBOM vulnerability list.
- matching_vuln = next(
- (vuln for vuln in vulnerabilities if vuln.get("id") == vulnerability["id"]),
- None
- )
- # bom-ref to the component is passed to the affects of the vulnerability
- # passed as argument
- bom_ref = next((a["ref"] for a in vulnerability.get("affects", [])), None)
- if matching_vuln is not None:
- # Remove the affect to not use it while updating matching vuln.
- if "affects" in vulnerability:
- del vulnerability["affects"]
- if matching_vuln.get("analysis") is not None and "analysis" in vulnerability:
- # We don't update vulnerabilities that already have an
- # 'analysis'.
- # Buildroot ignored vulnerabilities will already have
- # an analysis and need to remain as such.
- del vulnerability["analysis"]
- affects = matching_vuln.setdefault("affects", [])
- if bom_ref is not None:
- ref = next((a["ref"] for a in affects if a["ref"] == bom_ref), None)
- if ref is None:
- # Add a 'ref' (bom reference) to the component if not
- # already present in the 'affects' list.
- affects.append({
- "ref": bom_ref
- })
- # Update the metadata of the vulnerability with the one
- # downloaded from the database.
- matching_vuln.update(vulnerability)
- else:
- vulnerabilities.append(vulnerability)
- def check_package_cve_affects(cve: cvecheck.CVE, cpe_product_pkgs, sbom, opt: Options):
- vulnerabilities = sbom.setdefault("vulnerabilities", [])
- for product in cve.affected_products:
- for comp in cpe_product_pkgs.get(product, []):
- cve_status = cve.affects(comp["name"], comp["version"], comp["cpe"])
- if cve_status == cve.CVE_UNKNOWN:
- continue
- if cve_status == cve.CVE_DOESNT_AFFECT and not opt["include_resolved"]:
- continue
- vulnerability = nvd_cve_to_cdx_vulnerability(cve.nvd_cve)
- vulnerability["analysis"] = {
- "state": "exploitable" if cve_status == cve.CVE_AFFECTS else "resolved"
- }
- vulnerability["affects"] = [{
- "ref": comp["bom-ref"]
- }]
- vuln_append_or_update_affects_if_exists(vulnerabilities, vulnerability)
- def check_package_cves(nvd_path: Path, sbom, opt: Options):
- """
- Iterate over every entry of the NVD API mirror. Each vulnerability is
- compared to the set of components passed as argument in the 'sbom'.
- The vulnerabilities set of that 'sbom' argument is enriched with analysis
- of vulnerabilities that match that set of components.
- Args:
- nvd_path (Path): Path of the mirror of the NVD API.
- sbom (dict): Input SBOM containing a set of vulnerabilities that will be enriched.
- opt (Options): Options for the analysis.
- """
- cpe_product_pkgs = defaultdict(list)
- for comp in sbom.get("components", []):
- if comp.get("cpe") and comp.get("version"):
- cpe_product = cvecheck.CPE(comp["cpe"]).product
- cpe_product_pkgs[cpe_product].append(comp)
- for cve in cvecheck.CVE.read_nvd_dir(nvd_path):
- check_package_cve_affects(cve, cpe_product_pkgs, sbom, opt)
- def enrich_vulnerabilities(nvd_path: Path, sbom):
- """
- Iterate over the vulnerabilities present in the 'sbom' passed as arguments
- and enrich the vulnerability with content from the NVD API mirror.
- Args:
- nvd_path (Path): Path of the mirror of the NVD API.
- sbom (dict): Input SBOM containing a set of vulnerabilities that will be enriched.
- """
- vulnerabilities = sbom.setdefault("vulnerabilities", [])
- for vuln in vulnerabilities:
- vuln_id = vuln.get("id")
- if vuln_id is None or not vuln_id.upper().startswith("CVE-"):
- continue
- cve = cvecheck.CVE.read_nvd_entry(nvd_path, vuln_id)
- if cve is None:
- print(f"Warning: '{vuln_id}' doesn't exist in NVD database.", file=sys.stderr)
- continue
- vulnerability = nvd_cve_to_cdx_vulnerability(cve.nvd_cve)
- vuln_append_or_update_affects_if_exists(vulnerabilities, vulnerability)
- def main():
- parser = argparse.ArgumentParser(description=DESCRIPTION)
- parser.add_argument("-i", "--in-file", nargs="?", type=argparse.FileType("r"),
- default=(None if sys.stdin.isatty() else sys.stdin))
- parser.add_argument("-o", "--out-file", nargs="?", type=argparse.FileType("w"),
- default=sys.stdout)
- parser.add_argument('--nvd-path', dest='nvd_path',
- default=brpath / 'dl' / 'buildroot-nvd',
- help='Path to the local NVD database',
- type=lambda p: Path(p).expanduser().resolve())
- parser.add_argument("--enrich-only", default=False, action='store_true',
- help="Only update metadata for the vulnerabilities currently present " +
- "in the input CycloneDX SBOM. Don't do an analysis.")
- parser.add_argument("--include-resolved", default=False, action='store_true',
- help="Add vulnerabilities already 'resolved' that don't affect a " +
- "component to the output CycloneDX vulnerabilities analysis.")
- parser.add_argument("--no-nvd-update", default=False, action='store_true',
- help="Doesn't update the NVD database.")
- args = parser.parse_args()
- if args.in_file is None or args.nvd_path is None:
- parser.print_help()
- sys.exit(1)
- sbom = json.load(args.in_file)
- opt = Options(
- include_resolved=args.include_resolved,
- )
- args.nvd_path.mkdir(parents=True, exist_ok=True)
- if not args.no_nvd_update:
- cvecheck.CVE.download_nvd(args.nvd_path)
- if args.enrich_only:
- enrich_vulnerabilities(args.nvd_path, sbom)
- else:
- check_package_cves(args.nvd_path, sbom, opt)
- args.out_file.write(json.dumps(sbom, indent=2))
- args.out_file.write('\n')
- if __name__ == "__main__":
- main()
|