|
| 1 | +import csv |
| 2 | +import io |
| 3 | +import json |
| 4 | + |
| 5 | +import cvss.parser |
| 6 | +from cvss.cvss3 import CVSS3 |
| 7 | + |
| 8 | +from dojo.models import Finding |
| 9 | +from dojo.tools.sysdig_common.sysdig_data import SysdigData |
| 10 | + |
| 11 | + |
| 12 | +class SysdigCLIParser: |
| 13 | + |
| 14 | + """Sysdig CLI Report Importer - Runtime CSV""" |
| 15 | + |
| 16 | + def get_scan_types(self): |
| 17 | + return ["Sysdig CLI Report"] |
| 18 | + |
| 19 | + def get_label_for_scan_types(self, scan_type): |
| 20 | + return "Sysdig CLI Report Scan" |
| 21 | + |
| 22 | + def get_description_for_scan_types(self, scan_type): |
| 23 | + return "Import of Sysdig Report generated by the Sysdig CLI scanner" |
| 24 | + |
| 25 | + def get_findings(self, filename, test): |
| 26 | + if filename is None: |
| 27 | + return () |
| 28 | + if filename.name.lower().endswith(".csv"): |
| 29 | + arr_data = self.load_csv(filename) |
| 30 | + return self.parse_csv(arr_data=arr_data, test=test) |
| 31 | + if filename.name.lower().endswith(".json"): |
| 32 | + scan_data = filename.read() |
| 33 | + try: |
| 34 | + data = json.loads(str(scan_data, "utf-8")) |
| 35 | + except Exception: |
| 36 | + data = json.loads(scan_data) |
| 37 | + if "result" in data: |
| 38 | + return self.parse_json(data=data, test=test) |
| 39 | + |
| 40 | + if "data" in data: |
| 41 | + msg = "JSON file is not in the expected format, it looks like a Sysdig Vulnerability Report." |
| 42 | + else: |
| 43 | + msg = "JSON file is not in the expected format, expected data result element" |
| 44 | + |
| 45 | + raise ValueError(msg) |
| 46 | + return () |
| 47 | + |
| 48 | + def parse_json(self, data, test): |
| 49 | + findings = [] |
| 50 | + packages = data.get("result", {}).get("packages", []) |
| 51 | + |
| 52 | + for package in packages: |
| 53 | + # print(package) |
| 54 | + packageName = package.get("name", "") |
| 55 | + packageType = package.get("type", "") |
| 56 | + packagePath = package.get("path", "") |
| 57 | + packageVersion = package.get("version", "") |
| 58 | + packageSuggestedFix = package.get("suggestedFix", "") |
| 59 | + layerDigest = package.get("layerDigest", "") |
| 60 | + |
| 61 | + vulns = package.get("vulns", []) |
| 62 | + # print("vulns: %s" % vulns) |
| 63 | + for item in vulns: |
| 64 | + # print("item: %s" % item) |
| 65 | + vulnName = item.get("name", "") |
| 66 | + vulnSeverity = item.get("severity", {}).get("value", "") |
| 67 | + vulnCvssScore = item.get("cvssScore", {}).get("value", {}).get("score", "") |
| 68 | + vulnCvssVersion = item.get("cvssScore", {}).get("value", {}).get("version", "") |
| 69 | + vulnCvssVector = item.get("cvssScore", {}).get("value", {}).get("vector", "") |
| 70 | + vulnDisclosureDate = item.get("disclosureDate", "") |
| 71 | + vulnSolutionDate = item.get("solutionDate", "") |
| 72 | + vulnPublishedByVendorDate = item.get("publishDateByVendor", {}).get("nvd", "") |
| 73 | + vulnExploitable = item.get("exploitable", "") |
| 74 | + vulnFixVersion = item.get("fixedInVersion", "") |
| 75 | + |
| 76 | + description = "" |
| 77 | + description += "vulnCvssVersion: " + vulnCvssVersion + "\n" |
| 78 | + description += "vulnCvssScore: " + str(vulnCvssScore) + "\n" |
| 79 | + description += "vulnCvssVector: " + vulnCvssVector + "\n" |
| 80 | + description += "vulnDisclosureDate: " + vulnDisclosureDate + "\n" |
| 81 | + description += "vulnPublishedByVendorDate: " + vulnPublishedByVendorDate + "\n" |
| 82 | + description += "vulnSolutionDate: " + vulnSolutionDate + "\n" |
| 83 | + description += "vulnExploitable: " + str(vulnExploitable) + "\n" |
| 84 | + description += "packageName: " + packageName + "\n" |
| 85 | + description += "packageType: " + packageType + "\n" |
| 86 | + description += "packagePath: " + packagePath + "\n" |
| 87 | + description += "packageVersion: " + packageVersion + "\n" |
| 88 | + description += "packageSuggestedFix: " + packageSuggestedFix + "\n" |
| 89 | + description += "layerDigest: " + layerDigest + "\n" |
| 90 | + |
| 91 | + mitigation = "" |
| 92 | + mitigation += "vulnFixVersion: " + vulnFixVersion + "\n" |
| 93 | + mitigation += "suggestedFix: " + packageSuggestedFix + "\n" |
| 94 | + |
| 95 | + finding = Finding( |
| 96 | + title=vulnName + " - " + packageName + " - " + vulnFixVersion, |
| 97 | + test=test, |
| 98 | + description=description, |
| 99 | + severity=vulnSeverity, |
| 100 | + mitigation=mitigation, |
| 101 | + static_finding=True, |
| 102 | + component_name=packageName, |
| 103 | + component_version=packageVersion, |
| 104 | + ) |
| 105 | + |
| 106 | + try: |
| 107 | + if float(vulnCvssVersion) >= 3 and float(vulnCvssVersion) < 4: |
| 108 | + finding.cvssv3_score = vulnCvssScore |
| 109 | + vectors = cvss.parser.parse_cvss_from_text(vulnCvssVector) |
| 110 | + if len(vectors) > 0 and isinstance(vectors[0], CVSS3): |
| 111 | + finding.cvss = vectors[0].clean_vector() |
| 112 | + except ValueError: |
| 113 | + continue |
| 114 | + |
| 115 | + if vulnName != "": |
| 116 | + finding.unsaved_vulnerability_ids = [] |
| 117 | + finding.unsaved_vulnerability_ids.append(vulnName) |
| 118 | + findings.append(finding) |
| 119 | + return findings |
| 120 | + |
| 121 | + def parse_csv(self, arr_data, test): |
| 122 | + if len(arr_data) == 0: |
| 123 | + return () |
| 124 | + sysdig_report_findings = [] |
| 125 | + for row in arr_data: |
| 126 | + finding = Finding(test=test) |
| 127 | + # Generate finding |
| 128 | + finding.title = f"{row.vulnerability_id} - {row.package_name}" |
| 129 | + finding.vuln_id_from_tool = row.vulnerability_id |
| 130 | + finding.unsaved_vulnerability_ids = [] |
| 131 | + finding.unsaved_vulnerability_ids.append(row.vulnerability_id) |
| 132 | + finding.severity = row.severity |
| 133 | + # Set Component Version |
| 134 | + finding.component_name = row.package_name |
| 135 | + finding.component_version = row.package_version |
| 136 | + # Set some finding tags |
| 137 | + tags = [] |
| 138 | + if row.vulnerability_id != "": |
| 139 | + tags.append("VulnId: " + row.vulnerability_id) |
| 140 | + finding.tags = tags |
| 141 | + finding.dynamic_finding = False |
| 142 | + finding.static_finding = True |
| 143 | + finding.description += "\n\n###Vulnerability Details" |
| 144 | + finding.description += f"\n - **Vulnerability ID:** {row.vulnerability_id}" |
| 145 | + finding.description += f"\n - **Vulnerability Link:** {row.vuln_link}" |
| 146 | + finding.description += f"\n - **Severity:** {row.severity}" |
| 147 | + finding.description += f"\n - **Publish Date:** {row.vuln_publish_date}" |
| 148 | + finding.description += f"\n - **CVSS Version:** {row.cvss_version}" |
| 149 | + finding.description += f"\n - **CVSS Vector:** {row.cvss_vector}" |
| 150 | + if row.public_exploit != "": |
| 151 | + finding.description += f"\n - **Public Exploit:** {row.public_exploit}" |
| 152 | + finding.description += "\n\n###Package Details" |
| 153 | + if row.package_type == "os": |
| 154 | + finding.description += f"\n - **Package Type: {row.package_type} \\* Consider upgrading your Base OS \\***" |
| 155 | + else: |
| 156 | + finding.description += f"\n - **Package Type:** {row.package_type}" |
| 157 | + finding.description += f"\n - **Package Name:** {row.package_name}" |
| 158 | + finding.description += f"\n - **Package Version:** {row.package_version}" |
| 159 | + if row.package_path != "": |
| 160 | + finding.description += f"\n - **Package Path:** {row.package_path}" |
| 161 | + finding.file_path = row.package_path |
| 162 | + try: |
| 163 | + if float(row.cvss_version) >= 3 and float(row.cvss_version) < 4: |
| 164 | + finding.cvssv3_score = float(row.cvss_score) |
| 165 | + vectors = cvss.parser.parse_cvss_from_text(row.cvss_vector) |
| 166 | + if len(vectors) > 0 and isinstance(vectors[0], CVSS3): |
| 167 | + finding.cvss = vectors[0].clean_vector() |
| 168 | + except ValueError: |
| 169 | + continue |
| 170 | + finding.risk_accepted = row.risk_accepted |
| 171 | + # Set reference |
| 172 | + if row.vuln_link != "": |
| 173 | + finding.references = row.vuln_link |
| 174 | + finding.url = row.vuln_link |
| 175 | + finding.epss_score = row.epss_score |
| 176 | + # finally, Add finding to list |
| 177 | + sysdig_report_findings.append(finding) |
| 178 | + return sysdig_report_findings |
| 179 | + |
| 180 | + def load_csv(self, filename) -> SysdigData: |
| 181 | + |
| 182 | + if filename is None: |
| 183 | + return () |
| 184 | + |
| 185 | + content = filename.read() |
| 186 | + if isinstance(content, bytes): |
| 187 | + content = content.decode("utf-8") |
| 188 | + reader = csv.DictReader(io.StringIO(content), delimiter=",", quotechar='"') |
| 189 | + |
| 190 | + # normalise on lower case for consistency |
| 191 | + reader.fieldnames = [name.lower() for name in reader.fieldnames] |
| 192 | + |
| 193 | + csvarray = [] |
| 194 | + |
| 195 | + for row in reader: |
| 196 | + # Compare headers to values. |
| 197 | + if len(row) != len(reader.fieldnames): |
| 198 | + msg = f"Number of fields in row ({len(row)}) does not match number of headers ({len(reader.fieldnames)})" |
| 199 | + raise ValueError(msg) |
| 200 | + |
| 201 | + # Check for a CVE value to being with |
| 202 | + if not row[reader.fieldnames[0]].startswith("CVE"): |
| 203 | + msg = f"Expected 'CVE' at the start but got: {row[reader.fieldnames[0]]}" |
| 204 | + raise ValueError(msg) |
| 205 | + |
| 206 | + csvarray.append(row) |
| 207 | + |
| 208 | + if "vulnerability id" in reader.fieldnames: |
| 209 | + msg = "Unknown CSV format: Vulnerability ID column found, looks like a SysDig Vulnerability Report" |
| 210 | + raise ValueError(msg) |
| 211 | + |
| 212 | + if "cve id" not in reader.fieldnames: |
| 213 | + msg = "Unknown CSV format: expected CVE ID column" |
| 214 | + raise ValueError(msg) |
| 215 | + |
| 216 | + arr_csv_data = [] |
| 217 | + for row in csvarray: |
| 218 | + |
| 219 | + csv_data_record = SysdigData() |
| 220 | + msg = "" |
| 221 | + # Sydig CLI format |
| 222 | + csv_data_record.vulnerability_id = row.get("cve id", "") |
| 223 | + csv_data_record.severity = csv_data_record._map_severity(row.get("cve severity").upper()) |
| 224 | + csv_data_record.cvss_score = row.get("cvss score", "") |
| 225 | + csv_data_record.cvss_version = row.get("cvss score version", "") |
| 226 | + csv_data_record.package_name = row.get("package name", "") |
| 227 | + csv_data_record.package_version = row.get("package version", "") |
| 228 | + csv_data_record.package_type = row.get("package type", "") |
| 229 | + csv_data_record.package_path = row.get("package path", "") |
| 230 | + csv_data_record.vuln_fix_version = row.get("fix version", "") |
| 231 | + csv_data_record.vuln_link = row.get("cve url", "") |
| 232 | + csv_data_record.vuln_publish_date = row.get("vuln disclosure date", "") |
| 233 | + csv_data_record.vuln_fix_date = row.get("vuln fix date", "") |
| 234 | + csv_data_record.risk_accepted = row.get("risk accepted", "") == "TRUE" |
| 235 | + |
| 236 | + # new fields: |
| 237 | + csv_data_record.epss_score = row.get("epss score", "") |
| 238 | + |
| 239 | + # not present: |
| 240 | + # csv_data_record.public_exploit = row.get("public exploit", "") |
| 241 | + # csv_data_record.cvss_vector = row.get("cvss vector", "") |
| 242 | + # csv_data_record.image = row.get("image", "") |
| 243 | + # csv_data_record.os_name = row.get("os name", "") |
| 244 | + # csv_data_record.k8s_cluster_name = row.get("k8s cluster name", "") |
| 245 | + # csv_data_record.k8s_namespace_name = row.get("k8s namespace name", "") |
| 246 | + # csv_data_record.k8s_workload_type = row.get("k8s workload type", "") |
| 247 | + # csv_data_record.k8s_workload_name = row.get("k8s workload name", "") |
| 248 | + # csv_data_record.k8s_container_name = row.get("k8s container name", "") |
| 249 | + # csv_data_record.image_id = row.get("image id", "") |
| 250 | + # csv_data_record.k8s_pod_count = row.get("k8s pod count", "") |
| 251 | + # csv_data_record.package_suggested_fix = row.get("package suggested fix", "") |
| 252 | + # csv_data_record.in_use = row.get("in use", "") == "TRUE" |
| 253 | + # csv_data_record.registry_name = row.get("registry name", "") |
| 254 | + # csv_data_record.registry_image_repository = row.get("registry image repository", "") |
| 255 | + # csv_data_record.cloud_provider_name = row.get("cloud provider name", "") |
| 256 | + # csv_data_record.cloud_provider_account_id = row.get("cloud provider account ID", "") |
| 257 | + # csv_data_record.cloud_provider_region = row.get("cloud provider region", "") |
| 258 | + # csv_data_record.registry_vendor = row.get("registry vendor", "") |
| 259 | + |
| 260 | + arr_csv_data.append(csv_data_record) |
| 261 | + |
| 262 | + return arr_csv_data |
0 commit comments