Skip to content

Commit 88e9f59

Browse files
authored
Mayhem SARIF support (new parser) (#12624)
* rebase * linter checks * more linter checks * extend SarifParser instead of copying code * make linter happy * linter again * make sure func signatures match * tests * linter
1 parent 96ddf1c commit 88e9f59

File tree

10 files changed

+2740
-0
lines changed

10 files changed

+2740
-0
lines changed
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
---
2+
title: "Mayhem SARIF Reports"
3+
toc_hide: true
4+
---
5+
Import for Mayhem generated SARIF reports. In general, the exiting
6+
SARIF report consumer should work, and for general cases does. However,
7+
since Mayhem is A. DAST and B. includes fuzzed data in the content of
8+
the report, a Mayhem-specific SARIF consumer is added.
9+
See more below:
10+
[Mayhem SARIF Report (API)](https://docs.mayhem.security/api-testing/tutorials/identifying-api-issues/bug-reporting/#sarif-reports).
11+
[Mayhem SARIF Report (CI)](https://docs.mayhem.security/integrations/ci-integrations/github/#analyzing-sarif-reports).
12+
13+
14+
#### Parity with Existing SARIF Consumer
15+
16+
The current implementation is mostly lifted from the existing SARIF parser support. As such, it will also aggregate all the findings in the SARIF file in one single report, and it also supports fingerprint deduplication.
17+
18+
### Sample Scan Data
19+
Sample Mayhem SARIF reports can be found [here](https://github.com/DefectDojo/django-DefectDojo/tree/master/unittests/scans/mayhem).

dojo/tools/mayhem/__init__.py

Whitespace-only changes.

dojo/tools/mayhem/parser.py

Lines changed: 332 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,332 @@
1+
import json
2+
import logging
3+
import re
4+
5+
import dateutil.parser
6+
from django.utils.translation import gettext as _
7+
8+
from dojo.models import Finding
9+
from dojo.tools.parser_test import ParserTest
10+
from dojo.tools.sarif.parser import (
11+
SarifParser,
12+
cve_try,
13+
cvss_to_severity,
14+
get_codeFlowsDescription,
15+
get_fingerprints_hashes,
16+
get_properties_tags,
17+
get_references,
18+
get_result_cwes_properties,
19+
get_rule_cwes,
20+
get_rules,
21+
get_severity,
22+
get_snippet,
23+
get_title,
24+
)
25+
26+
logger = logging.getLogger(__name__)
27+
28+
CWE_REGEX = r"cwe-\d+"
29+
30+
31+
class MayhemParser(SarifParser):
32+
33+
"""
34+
Mayhem SARIF Parser
35+
36+
This class extends the existing SARIF parser, but with some minor
37+
modifications to better support the structure of Mayhem SARIF reports.
38+
"""
39+
40+
def get_scan_types(self):
41+
return ["Mayhem SARIF Report"]
42+
43+
def get_description_for_scan_types(self, scan_type):
44+
return "Mayhem SARIF reports from code or API runs."
45+
46+
# Due to mixing of class methods and package functions, we need to override some of these methods
47+
# without changing their behavior. __get_items_from_run is name mangled in the parent class,
48+
# so inherited methods cannot access the version here in MayhemParser.
49+
def get_findings(self, filehandle, test):
50+
"""For simple interface of parser contract we just aggregate everything"""
51+
tree = json.load(filehandle)
52+
items = []
53+
# for each runs we just aggregate everything
54+
for run in tree.get("runs", []):
55+
items.extend(self.__get_items_from_run(run))
56+
return items
57+
58+
def get_tests(self, scan_type, handle):
59+
tree = json.load(handle)
60+
tests = []
61+
for run in tree.get("runs", []):
62+
test = ParserTest(
63+
name=run["tool"]["driver"]["name"],
64+
parser_type=run["tool"]["driver"]["name"],
65+
version=run["tool"]["driver"].get("version"),
66+
)
67+
test.findings = self.__get_items_from_run(run)
68+
tests.append(test)
69+
return tests
70+
71+
def __get_items_from_run(self, run):
72+
items = []
73+
# load rules
74+
rules = get_rules(run)
75+
# Artifacts do not appear to be used anywhere
76+
# artifacts = get_artifacts(run)
77+
# get the timestamp of the run if possible
78+
run_date = self.__get_last_invocation_date(run)
79+
for result in run.get("results", []):
80+
result_items = get_items_from_result(result, rules, run_date)
81+
if result_items:
82+
items.extend(result_items)
83+
return items
84+
85+
def __get_last_invocation_date(self, data):
86+
invocations = data.get("invocations", [])
87+
if len(invocations) == 0:
88+
return None
89+
# try to get the last 'endTimeUtc'
90+
raw_date = invocations[-1].get("endTimeUtc")
91+
if raw_date is None:
92+
return None
93+
# if the data is here we try to convert it to datetime
94+
return dateutil.parser.isoparse(raw_date)
95+
96+
97+
def get_result_cwes_mcode(result):
98+
"""Mayhem SARIF reports include CWE property under taxa.toolComponent.name and number under taxa.id"""
99+
cwes = []
100+
if "taxa" in result:
101+
for taxon in result["taxa"]:
102+
if taxon.get("toolComponent", {}).get("name") == "CWE":
103+
value = taxon.get("id")
104+
if value:
105+
cwes.append(int(value))
106+
return cwes
107+
108+
109+
def clean_mayhem_title_text(text):
110+
"""Clean the title text for Mayhem SARIF reports."""
111+
if not text:
112+
return ""
113+
114+
# Remove links (and add limit to avoid catastrophic backtracking)
115+
link_regex = r"\[[^\]]{1,100}?\]\([^)]{1,200}?\)"
116+
text = re.sub(link_regex, "", text)
117+
118+
# Remove URL encoded characters
119+
url_encoding_regex = r"&#x\d+;"
120+
text = re.sub(url_encoding_regex, "", text)
121+
122+
# Remove single or double quotes
123+
quotes_regex = r"[\"']"
124+
text = re.sub(quotes_regex, "", text)
125+
126+
# Remove TDID
127+
tdid_regex = r"TDID-\d+\s*-\s*|TDID-\d+-"
128+
text = re.sub(tdid_regex, "", text)
129+
130+
return text.strip()
131+
132+
133+
def get_message_from_multiformatMessageString(data, rule, content_type="text"):
134+
"""
135+
Get a message from multimessage struct
136+
137+
Differs from Sarif implementation in that it handles markdown, specifies content_type
138+
"""
139+
if content_type == "markdown" and "markdown" in data:
140+
# handle markdown content
141+
markdown = data.get("markdown")
142+
# strip "headings" or anything that changes text size
143+
heading_regex = r"^#+\s*"
144+
markdown = re.sub(heading_regex, "", markdown, flags=re.MULTILINE)
145+
# replace non-unicode characters with "?"
146+
non_unicode_regex = r"[^\x09\x0A\x0D\x20-\x7E]"
147+
markdown = re.sub(non_unicode_regex, "?", markdown)
148+
return markdown.strip()
149+
if content_type == "text" and "text" in data:
150+
# handle text content
151+
text = data.get("text")
152+
if rule is not None and "id" in data:
153+
text = rule["messageStrings"][data["id"]].get("text")
154+
arguments = data.get("arguments", [])
155+
# argument substitution
156+
for i in range(6): # the specification limit to 6
157+
substitution_str = "{" + str(i) + "}"
158+
if substitution_str in text and i < len(arguments):
159+
text = text.replace(substitution_str, arguments[i])
160+
return text
161+
return ""
162+
163+
164+
def get_description(result, rule, location):
165+
"""Overwrite the SarifParser get_description to handle markdown"""
166+
description = ""
167+
message = ""
168+
if "message" in result:
169+
message = get_message_from_multiformatMessageString(
170+
result["message"], rule,
171+
)
172+
description += f"**Result message:** {message}\n"
173+
if get_snippet(location) is not None:
174+
description += f"**Snippet:**\n```\n{get_snippet(location)}\n```\n"
175+
if rule is not None:
176+
if "name" in rule:
177+
description += f"**{_('Rule name')}:** {rule.get('name')}\n"
178+
shortDescription = ""
179+
if "shortDescription" in rule:
180+
shortDescription = get_message_from_multiformatMessageString(
181+
rule["shortDescription"], rule,
182+
)
183+
if shortDescription != message:
184+
description += f"**{_('Rule short description')}:** {shortDescription}\n"
185+
if "fullDescription" in rule:
186+
fullDescription = get_message_from_multiformatMessageString(
187+
rule["fullDescription"], rule,
188+
)
189+
if (fullDescription != message) and (fullDescription != shortDescription):
190+
description += f"**{_('Rule full description')}:** {fullDescription}\n"
191+
if "markdown" in result["message"]:
192+
markdown = get_message_from_multiformatMessageString(
193+
result["message"], rule, content_type="markdown",
194+
)
195+
# Replace "Details" with "Link" in the markdown
196+
markdown = markdown.replace("Details", "Link")
197+
description += f"**{_('Additional Details')}:**\n{markdown}\n"
198+
description += "_(Unprintable characters are replaced with '?'; please see Mayhem for full reproducer.)_"
199+
if len(result.get("codeFlows", [])) > 0:
200+
description += get_codeFlowsDescription(result["codeFlows"])
201+
202+
return description.removesuffix("\n")
203+
204+
205+
def get_items_from_result(result, rules, run_date):
206+
# see
207+
# https://docs.oasis-open.org/sarif/sarif/v2.1.0/csprd01/sarif-v2.1.0-csprd01.html
208+
# / 3.27.9
209+
kind = result.get("kind", "fail")
210+
if kind != "fail":
211+
return None
212+
213+
# if finding is suppressed, mark it as False Positive
214+
# Note: see
215+
# https://docs.oasis-open.org/sarif/sarif/v2.0/csprd02/sarif-v2.0-csprd02.html#_Toc10127852
216+
suppressed = False
217+
if result.get("suppressions"):
218+
suppressed = True
219+
220+
# if there is a location get all files into files list
221+
222+
files = []
223+
224+
if "locations" in result:
225+
for location in result["locations"]:
226+
227+
file_path = None
228+
line = None
229+
230+
if "physicalLocation" in location:
231+
file_path = location["physicalLocation"]["artifactLocation"]["uri"]
232+
233+
# 'region' attribute is optionnal
234+
if "region" in location["physicalLocation"]:
235+
# https://docs.oasis-open.org/sarif/sarif/v2.0/csprd02/sarif-v2.0-csprd02.html / 3.30.1
236+
# need to check whether it is byteOffset
237+
if "byteOffset" in location["physicalLocation"]["region"]:
238+
pass
239+
else:
240+
line = location["physicalLocation"]["region"]["startLine"]
241+
242+
files.append((file_path, line, location))
243+
244+
if not files:
245+
files.append((None, None, None))
246+
247+
result_items = []
248+
249+
for file_path, line, location in files:
250+
251+
# test rule link
252+
rule = rules.get(result.get("ruleId"))
253+
254+
finding = Finding(
255+
title=clean_mayhem_title_text(get_title(result, rule)),
256+
severity=get_severity(result, rule),
257+
description=get_description(result, rule, location),
258+
static_finding=False,
259+
dynamic_finding=True,
260+
false_p=suppressed,
261+
active=not suppressed,
262+
file_path=file_path,
263+
line=line,
264+
references=get_references(rule),
265+
)
266+
267+
if "ruleId" in result:
268+
finding.vuln_id_from_tool = result["ruleId"]
269+
# for now we only support when the id of the rule is a CVE
270+
if cve_try(result["ruleId"]):
271+
finding.unsaved_vulnerability_ids = [cve_try(result["ruleId"])]
272+
# some time the rule id is here but the tool doesn't define it
273+
if rule is not None:
274+
cwes_extracted = get_rule_cwes(rule)
275+
# Find CWEs in Mayhem SARIF reports
276+
cwes_extracted.extend(get_result_cwes_mcode(result))
277+
if len(cwes_extracted) > 0:
278+
finding.cwe = cwes_extracted[-1]
279+
280+
# Some tools such as GitHub or Grype return the severity in properties
281+
# instead
282+
if "properties" in rule and "security-severity" in rule["properties"]:
283+
try:
284+
cvss = float(rule["properties"]["security-severity"])
285+
severity = cvss_to_severity(cvss)
286+
finding.cvssv3_score = cvss
287+
finding.severity = severity
288+
except ValueError:
289+
if rule["properties"]["security-severity"].lower().capitalize() in {"Info", "Low", "Medium", "High", "Critical"}:
290+
finding.severity = rule["properties"]["security-severity"].lower().capitalize()
291+
else:
292+
finding.severity = "Info"
293+
294+
# manage the case that some tools produce CWE as properties of the result
295+
cwes_properties_extracted = get_result_cwes_properties(result)
296+
if len(cwes_properties_extracted) > 0:
297+
finding.cwe = cwes_properties_extracted[-1]
298+
299+
# manage fixes provided in the report
300+
if "fixes" in result:
301+
finding.mitigation = "\n".join(
302+
[fix.get("description", {}).get("text") for fix in result["fixes"]],
303+
)
304+
305+
if run_date:
306+
finding.date = run_date
307+
308+
# manage tags provided in the report and rule and remove duplicated
309+
tags = list(set(get_properties_tags(rule) + get_properties_tags(result)))
310+
tags = [s.removeprefix("external/cwe/") for s in tags]
311+
finding.tags = tags
312+
313+
# manage fingerprints
314+
# fingerprinting in SARIF is more complete than in current implementation
315+
# SARIF standard make it possible to have multiple version in the same report
316+
# for now we just take the first one and keep the format to be able to
317+
# compare it
318+
if result.get("fingerprints"):
319+
hashes = get_fingerprints_hashes(result["fingerprints"])
320+
first_item = next(iter(hashes.items()))
321+
finding.unique_id_from_tool = first_item[1]["value"]
322+
elif result.get("partialFingerprints"):
323+
# for this one we keep an order to have id that could be compared
324+
hashes = get_fingerprints_hashes(result["partialFingerprints"])
325+
sorted_hashes = sorted(hashes.keys())
326+
finding.unique_id_from_tool = "|".join(
327+
[f'{key}:{hashes[key]["value"]}' for key in sorted_hashes],
328+
)
329+
330+
result_items.append(finding)
331+
332+
return result_items

0 commit comments

Comments
 (0)