|
| 1 | +import re |
| 2 | +from typing import Any, Optional, Tuple, Union |
| 3 | + |
| 4 | +import cvss.parser |
| 5 | +import dateutil.parser |
| 6 | +from cpe import CPE |
| 7 | +from django.core.exceptions import ImproperlyConfigured |
| 8 | + |
| 9 | +from dojo.models import Endpoint, Finding |
| 10 | + |
| 11 | +####### |
| 12 | +# Helpers/Utils |
| 13 | +####### |
| 14 | + |
| 15 | +# Pattern for stripping markup from entry values -- removes "[[markup]]" and "[[" and "]]" |
| 16 | +MARKUP_STRIPPING_PATTERN = re.compile(r"\[\[markup\]\]|\[\[|\]\]") |
| 17 | + |
| 18 | + |
| 19 | +def strip_markup(value: str) -> str: |
| 20 | + """ |
| 21 | + Strips out "markup" from value |
| 22 | + """ |
| 23 | + if value: |
| 24 | + return MARKUP_STRIPPING_PATTERN.sub("", value).strip() |
| 25 | + return value |
| 26 | + |
| 27 | + |
| 28 | +####### |
| 29 | +# Field parsing helper classes |
| 30 | +####### |
| 31 | +class FieldType: |
| 32 | + """ |
| 33 | + Base class for attribute handlers for parsers. Callable, and calls the .handle() method, which should be implemented |
| 34 | + by subclasses. |
| 35 | +
|
| 36 | + We lose type safety by accepting strings for target names; to try to work around this, the check() method on |
| 37 | + subclasses should check whether the configuration for this object makes sense (or as much sense as can be determined |
| 38 | + when the method is called) and raise an ImproperlyConfigured exception if it does not. |
| 39 | + """ |
| 40 | + def __init__(self, target_name): |
| 41 | + self.target_name = target_name |
| 42 | + |
| 43 | + def handle(self, engine_class, finding, value): |
| 44 | + pass |
| 45 | + |
| 46 | + def __call__(self, engine_class, finding, value): |
| 47 | + self.handle(engine_class, finding, value) |
| 48 | + |
| 49 | + def check(self, engine_parser): |
| 50 | + pass |
| 51 | + |
| 52 | + |
| 53 | +class Attribute(FieldType): |
| 54 | + """ |
| 55 | + Class for a field that maps directly from one in the input data to a Finding attribute. Initialized with a Finding |
| 56 | + attribute name, when called sets the value of that attribute to the passed-in value. |
| 57 | + """ |
| 58 | + def handle(self, engine_class, finding, value): |
| 59 | + setattr(finding, self.target_name, value) |
| 60 | + |
| 61 | + def check(self, engine_parser): |
| 62 | + if not hasattr(Finding, self.target_name): |
| 63 | + msg = f"Finding does not have attribute '{self.target_name}.'" |
| 64 | + raise ImproperlyConfigured(msg) |
| 65 | + |
| 66 | + |
| 67 | +class DeMarkupedAttribute(Attribute): |
| 68 | + """ |
| 69 | + Class for an Attribute (as above) but whose value is stripped of markup prior to being set. |
| 70 | + """ |
| 71 | + def handle(self, engine_class, finding, value): |
| 72 | + super().handle(engine_class, finding, strip_markup(value)) |
| 73 | + |
| 74 | + |
| 75 | +class Method(FieldType): |
| 76 | + """ |
| 77 | + Class for a field that requires a method to process it. Initialized with a method name, when called it invokes the |
| 78 | + method on the passed-in engine parser, passing in a Finding and value. It's expected that the method will update |
| 79 | + the Finding as it sees fit (i.e., this class does not modify the Finding) |
| 80 | + """ |
| 81 | + def handle(self, engine_parser, finding, value): |
| 82 | + getattr(engine_parser, self.target_name)(finding, value) |
| 83 | + |
| 84 | + def check(self, engine_parser): |
| 85 | + if not callable(getattr(engine_parser, self.target_name, None)): |
| 86 | + msg = f"{type(engine_parser).__name__} does not have method '{self.target_name}().'" |
| 87 | + raise ImproperlyConfigured(msg) |
| 88 | + |
| 89 | + |
| 90 | +class BaseEngineParser: |
| 91 | + """ |
| 92 | + Parser for data shared by all engines used by AppCheck, as well as data from an unknown/unspecified engine. |
| 93 | +
|
| 94 | + Directly mapped attributes, from JSON object -> Finding attribute: |
| 95 | + * _id -> unique_id_from_tool |
| 96 | + * cvss_v3_vector -> cvssv3 |
| 97 | + * epss_base_score -> epss_score |
| 98 | +
|
| 99 | + Directly mapped attributes but value is stripped of "markup" first, JSON Object -> Finding attribute: |
| 100 | + * title -> title |
| 101 | + * description -> description |
| 102 | + * solution -> mitigation |
| 103 | +
|
| 104 | + Data mapped with a bit of tinkering, JSON object -> Finding attribute: |
| 105 | + * first_detected_at -> date (parse date) |
| 106 | + * status -> active/false_p/risk_accepted (depending on value) |
| 107 | + * cves -> unsaved_vulnerability_ids (vulnerability_ids) |
| 108 | + * cpe -> component name/version |
| 109 | + * cvss_vector -> severity (determined using CVSS package) |
| 110 | + * notes -> appended to Finding description |
| 111 | + * details -> appended to Finding description |
| 112 | +
|
| 113 | + Child classes can override the _ENGINE_FIELDS_MAP dictionary to support extended/different functionality as so |
| 114 | + desired, without having to change/copy the common field parsing described above. |
| 115 | + """ |
| 116 | + SCANNING_ENGINE = "Unknown" |
| 117 | + |
| 118 | + # Field handling common to all findings returned by AppCheck |
| 119 | + _COMMON_FIELDS_MAP: dict[str, FieldType] = { |
| 120 | + "_id": Attribute("unique_id_from_tool"), |
| 121 | + "cvss_v3_vector": Attribute("cvssv3"), |
| 122 | + "epss_base_score": Attribute("epss_score"), |
| 123 | + "title": DeMarkupedAttribute("title"), |
| 124 | + "description": DeMarkupedAttribute("description"), |
| 125 | + "solution": DeMarkupedAttribute("mitigation"), |
| 126 | + "first_detected_at": Method("parse_initial_date"), |
| 127 | + "status": Method("parse_status"), |
| 128 | + "cves": Method("parse_cves"), |
| 129 | + "cpe": Method("parse_components"), |
| 130 | + "cvss_vector": Method("parse_severity"), |
| 131 | + # These should be listed after the 'description' entry; they append to it |
| 132 | + "notes": Method("parse_notes"), |
| 133 | + "details": Method("parse_details")} |
| 134 | + |
| 135 | + # Field handling specific to a given scanning_engine AppCheck uses |
| 136 | + _ENGINE_FIELDS_MAP: dict[str, FieldType] = {} |
| 137 | + |
| 138 | + def __init__(self): |
| 139 | + # Do a basic check that the fields we'll process over are valid |
| 140 | + for field_handler in self.get_engine_fields().values(): |
| 141 | + field_handler.check(self) |
| 142 | + |
| 143 | + ##### |
| 144 | + # For parsing the initial finding datetime to a date format pleasing to Finding |
| 145 | + ##### |
| 146 | + def get_date(self, value: str) -> Optional[str]: |
| 147 | + try: |
| 148 | + return str(dateutil.parser.parse(value).date()) |
| 149 | + except dateutil.parser.ParserError: |
| 150 | + return None |
| 151 | + |
| 152 | + def parse_initial_date(self, finding: Finding, value: str) -> None: |
| 153 | + finding.date = self.get_date(value) |
| 154 | + |
| 155 | + ##### |
| 156 | + # For parsing CVEs |
| 157 | + ##### |
| 158 | + CVE_PATTERN = re.compile("CVE-[0-9]+-[0-9]+", re.IGNORECASE) |
| 159 | + |
| 160 | + def is_cve(self, c: str) -> bool: |
| 161 | + return bool(c and isinstance(c, str) and self.CVE_PATTERN.fullmatch(c)) |
| 162 | + |
| 163 | + def parse_cves(self, finding: Finding, value: [str]) -> None: |
| 164 | + finding.unsaved_vulnerability_ids = [c.upper() for c in value if self.is_cve(c)] |
| 165 | + |
| 166 | + ##### |
| 167 | + # Handles setting various status flags on the Finding |
| 168 | + ##### |
| 169 | + def parse_status(self, finding: Finding, value: str) -> None: |
| 170 | + # Possible values (best guess): unfixed (the initial value), fixed, false_positive, and acceptable_risk |
| 171 | + value = value.lower() |
| 172 | + if value == "fixed": |
| 173 | + finding.active = False |
| 174 | + elif value == "false_positive": |
| 175 | + finding.false_p = True |
| 176 | + elif value == "acceptable_risk": |
| 177 | + finding.risk_accepted = True |
| 178 | + |
| 179 | + ##### |
| 180 | + # For severity (extracted from cvss vector) |
| 181 | + ##### |
| 182 | + def get_severity(self, value: str) -> Optional[str]: |
| 183 | + if cvss_obj := cvss.parser.parse_cvss_from_text(value): |
| 184 | + if (severity := cvss_obj[0].severities()[0].title()) in Finding.SEVERITIES: |
| 185 | + return severity |
| 186 | + return None |
| 187 | + |
| 188 | + def parse_severity(self, finding: Finding, value: str) -> None: |
| 189 | + if severity := self.get_severity(value): |
| 190 | + finding.severity = severity |
| 191 | + |
| 192 | + ##### |
| 193 | + # For parsing component data |
| 194 | + ##### |
| 195 | + def parse_cpe(self, cpe_str: str) -> (Optional[str], Optional[str]): |
| 196 | + if not cpe_str: |
| 197 | + return None, None |
| 198 | + cpe_obj = CPE(cpe_str) |
| 199 | + return ( |
| 200 | + cpe_obj.get_product() and cpe_obj.get_product()[0] or None, |
| 201 | + cpe_obj.get_version() and cpe_obj.get_version()[0] or None, |
| 202 | + ) |
| 203 | + |
| 204 | + def parse_components(self, finding: Finding, value: [str]) -> None: |
| 205 | + # Only use the first entry |
| 206 | + finding.component_name, finding.component_version = self.parse_cpe(value[0]) |
| 207 | + |
| 208 | + ##### |
| 209 | + # For parsing additional description-related entries (description, notes, and details) |
| 210 | + ##### |
| 211 | + def format_additional_description(self, section: str, value: str) -> str: |
| 212 | + return f"**{section}**: {strip_markup(value)}" |
| 213 | + |
| 214 | + def append_description(self, finding: Finding, addendum: dict[str, str]) -> None: |
| 215 | + if addendum: |
| 216 | + if finding.description: |
| 217 | + finding.description += "\n\n" |
| 218 | + finding.description += "\n\n".join([self.format_additional_description(k, v) for k, v in addendum.items()]) |
| 219 | + |
| 220 | + def parse_notes(self, finding: Finding, value: str) -> None: |
| 221 | + self.append_description(finding, {"Notes": value}) |
| 222 | + |
| 223 | + def extract_details(self, value: Union[str, dict[str, Union[str, dict[str, [str]]]]]) -> dict[str, str]: |
| 224 | + if isinstance(value, dict): |
| 225 | + return {k: v for k, v in value.items() if k != "_meta"} |
| 226 | + return {"Details": str(value)} |
| 227 | + |
| 228 | + def parse_details(self, finding: Finding, value: dict[str, Union[str, dict[str, [str]]]]) -> None: |
| 229 | + self.append_description(finding, self.extract_details(value)) |
| 230 | + |
| 231 | + ##### |
| 232 | + # For parsing endpoints |
| 233 | + ##### |
| 234 | + def get_host(self, item: dict[str, Any]) -> str: |
| 235 | + return item.get("url") or item.get("host") or item.get("ipv4_address") or None |
| 236 | + |
| 237 | + def parse_port(self, item: Any) -> Optional[int]: |
| 238 | + try: |
| 239 | + int_val = int(item) |
| 240 | + if 0 < int_val <= 65535: |
| 241 | + return int_val |
| 242 | + except (ValueError, TypeError): |
| 243 | + pass |
| 244 | + return None |
| 245 | + |
| 246 | + def get_port(self, item: dict[str, Any]) -> Optional[int]: |
| 247 | + return self.parse_port(item.get("port")) |
| 248 | + |
| 249 | + def construct_endpoint(self, host: str, port: Optional[int]) -> Endpoint: |
| 250 | + endpoint = Endpoint.from_uri(host) |
| 251 | + if endpoint.host: |
| 252 | + if port: |
| 253 | + endpoint.port = port |
| 254 | + else: |
| 255 | + endpoint = Endpoint(host=host, port=port) |
| 256 | + return endpoint |
| 257 | + |
| 258 | + def parse_endpoints(self, item: dict[str, Any]) -> [Endpoint]: |
| 259 | + # Endpoint requires a host |
| 260 | + if host := self.get_host(item): |
| 261 | + port = self.get_port(item) |
| 262 | + return [self.construct_endpoint(host, port)] |
| 263 | + return [] |
| 264 | + |
| 265 | + def set_endpoints(self, finding: Finding, item: Any) -> None: |
| 266 | + endpoints = self.parse_endpoints(item) |
| 267 | + finding.unsaved_endpoints.extend(endpoints) |
| 268 | + |
| 269 | + # Returns the complete field processing map: common fields plus any engine-specific |
| 270 | + def get_engine_fields(self) -> dict[str, FieldType]: |
| 271 | + return { |
| 272 | + **BaseEngineParser._COMMON_FIELDS_MAP, |
| 273 | + **self._ENGINE_FIELDS_MAP} |
| 274 | + |
| 275 | + def get_finding_key(self, finding: Finding) -> Tuple: |
| 276 | + return ( |
| 277 | + finding.severity, |
| 278 | + finding.title, |
| 279 | + tuple(sorted([(e.host, e.port) for e in finding.unsaved_endpoints])), |
| 280 | + self.SCANNING_ENGINE, |
| 281 | + ) |
| 282 | + |
| 283 | + def parse_finding(self, item: dict[str, Any]) -> Tuple[Finding, Tuple]: |
| 284 | + finding = Finding() |
| 285 | + for field, field_handler in self.get_engine_fields().items(): |
| 286 | + # Check first whether the field even exists on this item entry; if not, skip it |
| 287 | + if value := item.get(field): |
| 288 | + field_handler(self, finding, value) |
| 289 | + self.set_endpoints(finding, item) |
| 290 | + # Make a note of what scanning engine was used for this Finding |
| 291 | + self.append_description(finding, {"Scanning Engine": self.SCANNING_ENGINE}) |
| 292 | + return finding, self.get_finding_key(finding) |
0 commit comments