|
| 1 | +# Copyright (C) 2025 Intel Corporation |
| 2 | +# SPDX-License-Identifier: GPL-3.0-or-later |
| 3 | + |
| 4 | +import os |
| 5 | +from collections import defaultdict |
| 6 | +from typing import Any, DefaultDict, Dict, Set, Union |
| 7 | + |
| 8 | +from lib4vex.generator import VEXGenerator |
| 9 | +from lib4vex.parser import VEXParser |
| 10 | + |
| 11 | +from cve_bin_tool.log import LOGGER |
| 12 | +from cve_bin_tool.util import ProductInfo, Remarks, decode_bom_ref, decode_purl |
| 13 | +from lib4vex import Validator as VEXValidator |
| 14 | + |
| 15 | +TriageData = Dict[str, Union[Dict[str, Any], Set[str]]] |
| 16 | + |
| 17 | + |
| 18 | +class VexHandler: |
| 19 | + """ |
| 20 | + A centralized handler class for all VEX format operations. |
| 21 | + Supports CSAF, CycloneDX, and OpenVEX formats. |
| 22 | +
|
| 23 | + This class uses lib4vex for parsing, validation, and generation of VEX documents. |
| 24 | +
|
| 25 | + Attributes: |
| 26 | + logger: Logger for logging information. |
| 27 | + analysis_state: Mapping between VEX format states and internal Remarks. |
| 28 | + """ |
| 29 | + |
| 30 | + # Mapping between different VEX format states and internal Remarks |
| 31 | + analysis_state = { |
| 32 | + "cyclonedx": { |
| 33 | + "in_triage": Remarks.NewFound, |
| 34 | + "exploitable": Remarks.Confirmed, |
| 35 | + "resolved": Remarks.Mitigated, |
| 36 | + "false_positive": Remarks.FalsePositive, |
| 37 | + "not_affected": Remarks.NotAffected, |
| 38 | + }, |
| 39 | + "csaf": { |
| 40 | + "first_affected": Remarks.NewFound, |
| 41 | + "first_fixed": Remarks.Mitigated, |
| 42 | + "fixed": Remarks.Mitigated, |
| 43 | + "known_affected": Remarks.Confirmed, |
| 44 | + "known_not_affected": Remarks.NotAffected, |
| 45 | + "last_affected": Remarks.Confirmed, |
| 46 | + "recommended": Remarks.Mitigated, |
| 47 | + "under_investigation": Remarks.NewFound, |
| 48 | + }, |
| 49 | + "openvex": { |
| 50 | + "not_affected": Remarks.NotAffected, |
| 51 | + "affected": Remarks.Confirmed, |
| 52 | + "fixed": Remarks.Mitigated, |
| 53 | + "under_investigation": Remarks.NewFound, |
| 54 | + }, |
| 55 | + } |
| 56 | + |
| 57 | + def __init__(self, logger=None): |
| 58 | + """ |
| 59 | + Initialize the VexHandler. |
| 60 | +
|
| 61 | + Args: |
| 62 | + logger: Optional logger to use. Defaults to a new child logger. |
| 63 | + """ |
| 64 | + self.logger = logger or LOGGER.getChild(self.__class__.__name__) |
| 65 | + |
| 66 | + def parse( |
| 67 | + self, filename: str, vextype: str = "auto" |
| 68 | + ) -> DefaultDict[ProductInfo, TriageData]: |
| 69 | + """ |
| 70 | + Parse a VEX file and extract the necessary information. |
| 71 | +
|
| 72 | + Args: |
| 73 | + filename: Path to the VEX file. |
| 74 | + vextype: Type of VEX file. Can be 'cyclonedx', 'csaf', 'openvex', or 'auto' for automatic detection. |
| 75 | +
|
| 76 | + Returns: |
| 77 | + Dictionary mapping ProductInfo to vulnerability data. |
| 78 | + """ |
| 79 | + if not os.path.isfile(filename): |
| 80 | + self.logger.error(f"VEX file not found: {filename}") |
| 81 | + return defaultdict(dict) |
| 82 | + |
| 83 | + try: |
| 84 | + vexparser = VEXParser(vex_type=vextype) |
| 85 | + vexparser.parse(filename) |
| 86 | + |
| 87 | + # Get the detected type if auto was specified |
| 88 | + if vextype == "auto": |
| 89 | + vextype = vexparser.get_type() |
| 90 | + |
| 91 | + self.logger.info(f"Parsed VEX file: {filename} of type: {vextype}") |
| 92 | + |
| 93 | + return self._process_parsed_data(vexparser, vextype) |
| 94 | + |
| 95 | + except Exception as e: |
| 96 | + self.logger.error(f"Error parsing VEX file {filename}: {str(e)}") |
| 97 | + return defaultdict(dict) |
| 98 | + |
| 99 | + def validate(self, filename: str, vextype: str = "auto") -> bool: |
| 100 | + """ |
| 101 | + Validate a VEX file against its schema. |
| 102 | +
|
| 103 | + Args: |
| 104 | + filename: Path to the VEX file. |
| 105 | + vextype: Type of VEX file. Can be 'cyclonedx', 'csaf', 'openvex', or 'auto' for automatic detection. |
| 106 | +
|
| 107 | + Returns: |
| 108 | + True if the file is valid, False otherwise. |
| 109 | + """ |
| 110 | + if not os.path.isfile(filename): |
| 111 | + self.logger.error(f"VEX file not found: {filename}") |
| 112 | + return False |
| 113 | + |
| 114 | + try: |
| 115 | + validator = VEXValidator(vex_type=vextype) |
| 116 | + is_valid = validator.validate(filename) |
| 117 | + |
| 118 | + if is_valid: |
| 119 | + self.logger.info(f"VEX file {filename} is valid") |
| 120 | + else: |
| 121 | + self.logger.error(f"VEX file {filename} is invalid") |
| 122 | + |
| 123 | + return is_valid |
| 124 | + |
| 125 | + except Exception as e: |
| 126 | + self.logger.error(f"Error validating VEX file {filename}: {str(e)}") |
| 127 | + return False |
| 128 | + |
| 129 | + def generate(self, data: Dict, output_file: str, vextype: str) -> bool: |
| 130 | + """ |
| 131 | + Generate a VEX document from data. |
| 132 | +
|
| 133 | + Args: |
| 134 | + data: Data to include in the VEX document. |
| 135 | + output_file: Path where to save the generated VEX document. |
| 136 | + vextype: Type of VEX document to generate ('cyclonedx', 'csaf', or 'openvex'). |
| 137 | +
|
| 138 | + Returns: |
| 139 | + True if the file was successfully generated, False otherwise. |
| 140 | + """ |
| 141 | + try: |
| 142 | + generator = VEXGenerator(vex_type=vextype) |
| 143 | + generator.generate(data, output_file) |
| 144 | + self.logger.info(f"Generated {vextype} VEX file: {output_file}") |
| 145 | + return True |
| 146 | + |
| 147 | + except Exception as e: |
| 148 | + self.logger.error(f"Error generating VEX file {output_file}: {str(e)}") |
| 149 | + return False |
| 150 | + |
| 151 | + def convert( |
| 152 | + self, |
| 153 | + input_file: str, |
| 154 | + output_file: str, |
| 155 | + from_type: str = "auto", |
| 156 | + to_type: str = "cyclonedx", |
| 157 | + ) -> bool: |
| 158 | + """ |
| 159 | + Convert a VEX file from one format to another. |
| 160 | +
|
| 161 | + Args: |
| 162 | + input_file: Path to the input VEX file. |
| 163 | + output_file: Path where to save the converted VEX document. |
| 164 | + from_type: Type of input VEX file. Can be 'cyclonedx', 'csaf', 'openvex', or 'auto'. |
| 165 | + to_type: Type of output VEX file. Can be 'cyclonedx', 'csaf', or 'openvex'. |
| 166 | +
|
| 167 | + Returns: |
| 168 | + True if conversion was successful, False otherwise. |
| 169 | + """ |
| 170 | + try: |
| 171 | + # Parse the input file |
| 172 | + parsed_data = self.parse(input_file, from_type) |
| 173 | + if not parsed_data: |
| 174 | + return False |
| 175 | + |
| 176 | + # Convert to the target format and generate the output file |
| 177 | + return self.generate(parsed_data, output_file, to_type) |
| 178 | + |
| 179 | + except Exception as e: |
| 180 | + self.logger.error( |
| 181 | + f"Error converting VEX file {input_file} to {to_type}: {str(e)}" |
| 182 | + ) |
| 183 | + return False |
| 184 | + |
| 185 | + def _process_parsed_data( |
| 186 | + self, vexparser, vextype: str |
| 187 | + ) -> DefaultDict[ProductInfo, TriageData]: |
| 188 | + """ |
| 189 | + Process the parsed VEX data and extract the necessary information. |
| 190 | +
|
| 191 | + This method performs the following steps: |
| 192 | + 1. Extracts metadata, product information, and vulnerabilities from the parser |
| 193 | + 2. Iterates through each vulnerability to extract key details like ID, status, justification |
| 194 | + 3. Maps VEX format-specific status values to internal Remarks |
| 195 | + 4. Decodes product identifiers (bom_ref or purl) to obtain consistent ProductInfo objects |
| 196 | + 5. Collects all vulnerability data per product |
| 197 | +
|
| 198 | + Args: |
| 199 | + vexparser: The VEXParser object with parsed data. |
| 200 | + vextype: The type of VEX document ('cyclonedx', 'csaf', or 'openvex'). |
| 201 | +
|
| 202 | + Returns: |
| 203 | + DefaultDict mapping ProductInfo objects to their vulnerability data. |
| 204 | + """ |
| 205 | + parsed_data = defaultdict(dict) |
| 206 | + serialNumbers = set() |
| 207 | + vulnerabilities = vexparser.get_vulnerabilities() |
| 208 | + metadata = vexparser.get_metadata() |
| 209 | + product = vexparser.get_product() |
| 210 | + |
| 211 | + # Extract product info based on VEX type but not used directly in this method |
| 212 | + # Just stored for future extensions or reference |
| 213 | + _ = self._extract_product_info(vextype, metadata, product) |
| 214 | + |
| 215 | + # Process vulnerabilities |
| 216 | + for vuln in vulnerabilities: |
| 217 | + # Extract necessary fields from the vulnerability |
| 218 | + cve_id = vuln.get("id") |
| 219 | + remarks = self.analysis_state[vextype][vuln.get("status")] |
| 220 | + justification = vuln.get("justification") |
| 221 | + response = vuln.get("remediation") |
| 222 | + comments = vuln.get("comment") |
| 223 | + |
| 224 | + # If the comment doesn't already have the justification prepended, add it |
| 225 | + if comments and justification and not comments.startswith(justification): |
| 226 | + comments = f"{justification}: {comments}" |
| 227 | + |
| 228 | + severity = vuln.get("severity") |
| 229 | + |
| 230 | + # Decode the bom reference or purl based on VEX type |
| 231 | + product_info = None |
| 232 | + serialNumber = "" |
| 233 | + if vextype == "cyclonedx": |
| 234 | + decoded_ref = decode_bom_ref(vuln.get("bom_link")) |
| 235 | + if isinstance(decoded_ref, tuple) and not isinstance( |
| 236 | + decoded_ref, ProductInfo |
| 237 | + ): |
| 238 | + product_info, serialNumber = decoded_ref |
| 239 | + serialNumbers.add(serialNumber) |
| 240 | + else: |
| 241 | + product_info = decoded_ref |
| 242 | + elif vextype in ["openvex", "csaf"]: |
| 243 | + product_info = decode_purl(vuln.get("purl")) |
| 244 | + |
| 245 | + if product_info: |
| 246 | + cve_data = { |
| 247 | + "remarks": remarks, |
| 248 | + "comments": comments if comments else "", |
| 249 | + "response": response if response else [], |
| 250 | + } |
| 251 | + if justification: |
| 252 | + cve_data["justification"] = justification.strip() |
| 253 | + |
| 254 | + if severity: |
| 255 | + cve_data["severity"] = severity.strip() |
| 256 | + |
| 257 | + parsed_data[product_info][cve_id.strip()] = cve_data |
| 258 | + |
| 259 | + if "paths" not in parsed_data[product_info]: |
| 260 | + parsed_data[product_info]["paths"] = {} |
| 261 | + |
| 262 | + self.logger.debug(f"Parsed VEX data: {parsed_data}") |
| 263 | + return parsed_data |
| 264 | + |
| 265 | + def _extract_product_info( |
| 266 | + self, vextype: str, metadata: Dict, product: Dict |
| 267 | + ) -> Dict[str, str]: |
| 268 | + """ |
| 269 | + Extract product information from the parsed VEX file. |
| 270 | +
|
| 271 | + Args: |
| 272 | + vextype: Type of VEX document. |
| 273 | + metadata: Metadata from the VEX document. |
| 274 | + product: Product information from the VEX document. |
| 275 | +
|
| 276 | + Returns: |
| 277 | + Dictionary with product information. |
| 278 | + """ |
| 279 | + product_info = {} |
| 280 | + if vextype == "cyclonedx": |
| 281 | + # release and vendor is not available in cyclonedx |
| 282 | + product_info["product"] = metadata.get("name") |
| 283 | + product_info["release"] = "" |
| 284 | + product_info["vendor"] = "" |
| 285 | + elif vextype == "csaf": |
| 286 | + csaf_product = product.get("CSAFPID_0001", {}) |
| 287 | + if csaf_product: |
| 288 | + product_info["product"] = csaf_product.get("product") |
| 289 | + product_info["release"] = csaf_product.get("version") |
| 290 | + product_info["vendor"] = csaf_product.get("vendor") |
| 291 | + elif vextype == "openvex": |
| 292 | + # product and release is not available in openvex |
| 293 | + product_info["product"] = "" |
| 294 | + product_info["release"] = "" |
| 295 | + product_info["vendor"] = metadata.get("author") |
| 296 | + |
| 297 | + return product_info |
0 commit comments