Skip to content

Commit 7b81a16

Browse files
💄 🪲 Fix Aqua parser severity justification (#12192)
* 💄 🪲 Fix Aqua parser severity justification * update * update
1 parent aa966d9 commit 7b81a16

File tree

2 files changed

+170
-175
lines changed

2 files changed

+170
-175
lines changed

dojo/tools/aqua/parser.py

Lines changed: 160 additions & 175 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ def get_dedupe_fields(self) -> list[str]:
9292
# "title",
9393
# "description",
9494
# ]
95+
9596
def get_scan_types(self):
9697
return ["Aqua Scan"]
9798

@@ -103,9 +104,6 @@ def get_description_for_scan_types(self, scan_type):
103104

104105
def get_findings(self, json_output, test):
105106
tree = json.load(json_output)
106-
return self.get_items(tree, test)
107-
108-
def get_items(self, tree, test):
109107
self.items = {}
110108
if isinstance(tree, list): # Aqua Scan Report coming from Azure Devops jobs (Windows based image)
111109
vulnerabilitytree = tree[0]["results"]["resources"] if tree else []
@@ -115,11 +113,15 @@ def get_items(self, tree, test):
115113
self.vulnerability_tree(vulnerabilitytree, test)
116114
elif "result" in tree: # Aqua Scan Report from apiv2
117115
resulttree = tree["result"]
118-
self.result_tree(resulttree, test)
116+
for vuln in resulttree:
117+
resource = vuln.get("resource")
118+
item = self.get_item(resource, vuln, test)
119+
unique_key = resource.get("cpe") + vuln.get("name", "None") + resource.get("path", "None")
120+
self.items[unique_key] = item
119121
elif "cves" in tree: # Aqua Scan Report from apiv1
120122
for cve in tree["cves"]:
121123
unique_key = cve.get("file") + cve.get("name")
122-
self.items[unique_key] = get_item_v2(cve, test)
124+
self.items[unique_key] = self.get_item_v2(cve, test)
123125
return list(self.items.values())
124126

125127
def vulnerability_tree(self, vulnerabilitytree, test):
@@ -130,189 +132,172 @@ def vulnerability_tree(self, vulnerabilitytree, test):
130132
if vulnerabilities is None:
131133
vulnerabilities = []
132134
for vuln in vulnerabilities:
133-
item = get_item(resource, vuln, test)
135+
item = self.get_item(resource, vuln, test)
134136
unique_key = resource.get("cpe") + vuln.get("name", "None") + resource.get("path", "None")
135137
self.items[unique_key] = item
136138
if sensitive_items is None:
137139
sensitive_items = []
138140
for sensitive_item in sensitive_items:
139-
item = get_item_sensitive_data(resource, sensitive_item, test)
141+
item = self.get_item_sensitive_data(resource, sensitive_item, test)
140142
unique_key = resource.get("cpe") + resource.get("path", "None") + str(sensitive_item)
141143
self.items[unique_key] = item
142144

143-
def result_tree(self, resulttree, test):
144-
for vuln in resulttree:
145-
resource = vuln.get("resource")
146-
item = get_item(resource, vuln, test)
147-
unique_key = resource.get("cpe") + vuln.get("name", "None") + resource.get("path", "None")
148-
self.items[unique_key] = item
149-
150-
151-
def get_item(resource, vuln, test):
152-
resource_name = resource.get("name", resource.get("path"))
153-
resource_version = resource.get("version", "No version")
154-
vulnerability_id = vuln.get("name", "No CVE")
155-
fix_version = vuln.get("fix_version", "None")
156-
description = vuln.get("description", "No description.") + "\n"
157-
if resource.get("path"):
158-
description += "**Path:** " + resource.get("path") + "\n"
159-
cvssv3 = None
160-
161-
url = ""
162-
if "nvd_url" in vuln:
163-
url += "\n{}".format(vuln.get("nvd_url"))
164-
if "vendor_url" in vuln:
165-
url += "\n{}".format(vuln.get("vendor_url"))
166-
167-
# Take in order of preference (most prio at the bottom of ifs), and put
168-
# everything in severity justification anyways.
169-
score = 0
170-
severity_justification = ""
171-
used_for_classification = ""
172-
if "aqua_severity" in vuln:
173-
score = vuln.get("aqua_severity")
174-
severity = aqua_severity_of(score)
175-
used_for_classification = (
176-
f"Aqua security score ({score}) used for classification.\n"
177-
)
178-
severity_justification = vuln.get("aqua_severity_classification")
179-
if "nvd_score_v3" in vuln:
180-
cvssv3 = vuln.get("nvd_vectors_v3")
181-
else:
145+
def get_item(self, resource, vuln, test):
146+
resource_name = resource.get("name", resource.get("path"))
147+
resource_version = resource.get("version", "No version")
148+
vulnerability_id = vuln.get("name", "No CVE")
149+
fix_version = vuln.get("fix_version", "None")
150+
description = vuln.get("description", "No description.") + "\n"
151+
if resource.get("path"):
152+
description += "**Path:** " + resource.get("path") + "\n"
153+
cvssv3 = None
154+
url = ""
155+
if "nvd_url" in vuln:
156+
url += "\n{}".format(vuln.get("nvd_url"))
157+
if "vendor_url" in vuln:
158+
url += "\n{}".format(vuln.get("vendor_url"))
159+
# Take in order of preference (most prio at the bottom of ifs), and put
160+
# everything in severity justification anyways.
161+
score = None
162+
severity_justification = ""
163+
used_for_classification = ""
164+
if "aqua_severity" in vuln:
165+
if score is None:
166+
score = vuln.get("aqua_severity")
167+
used_for_classification = (
168+
f"Aqua severity ({score}) used for classification.\n"
169+
)
170+
severity_justification += "\nAqua severity classification: {}".format(vuln.get("aqua_severity_classification"))
171+
severity_justification += "\nAqua scoring system: {}".format(vuln.get("aqua_scoring_system"))
172+
if "nvd_score_v3" in vuln:
173+
cvssv3 = vuln.get("nvd_vectors_v3")
182174
if "aqua_score" in vuln:
183-
score = vuln.get("aqua_score")
184-
used_for_classification = (
185-
f"Aqua score ({score}) used for classification.\n"
186-
)
187-
elif "vendor_score" in vuln:
188-
score = vuln.get("vendor_score")
189-
used_for_classification = (
190-
f"Vendor score ({score}) used for classification.\n"
191-
)
192-
elif "nvd_score_v3" in vuln:
193-
score = vuln.get("nvd_score_v3")
194-
used_for_classification = (
195-
f"NVD score v3 ({score}) used for classification.\n"
196-
)
197-
severity_justification += "\nNVD v3 vectors: {}".format(
198-
vuln.get("nvd_vectors_v3"),
199-
)
175+
if score is None:
176+
score = vuln.get("aqua_score")
177+
used_for_classification = (
178+
f"Aqua score ({score}) used for classification.\n"
179+
)
180+
severity_justification += "\nAqua score: {}".format(vuln.get("aqua_score"))
181+
if "vendor_score" in vuln:
182+
if score is None:
183+
score = vuln.get("vendor_score")
184+
used_for_classification = (
185+
f"Vendor score ({score}) used for classification.\n"
186+
)
187+
severity_justification += "\nVendor score: {}".format(vuln.get("vendor_score"))
188+
if "nvd_score_v3" in vuln:
189+
if score is None:
190+
score = vuln.get("nvd_score_v3")
191+
used_for_classification = (
192+
f"NVD score v3 ({score}) used for classification.\n"
193+
)
194+
severity_justification += "\nNVD v3 vectors: {}".format(vuln.get("nvd_vectors_v3"))
200195
# Add the CVSS3 to Finding
201196
cvssv3 = vuln.get("nvd_vectors_v3")
202-
elif "nvd_score" in vuln:
203-
score = vuln.get("nvd_score")
204-
used_for_classification = (
205-
f"NVD score v2 ({score}) used for classification.\n"
206-
)
207-
severity_justification += "\nNVD v2 vectors: {}".format(
208-
vuln.get("nvd_vectors"),
209-
)
210-
severity = severity_of(score)
197+
if "nvd_score" in vuln:
198+
if score is None:
199+
score = vuln.get("nvd_score")
200+
used_for_classification = (
201+
f"NVD score v2 ({score}) used for classification.\n"
202+
)
203+
severity_justification += "\nNVD v2 vectors: {}".format(vuln.get("nvd_vectors"))
211204
severity_justification += f"\n{used_for_classification}"
205+
severity = self.severity_of(score)
206+
finding = Finding(
207+
title=vulnerability_id
208+
+ " - "
209+
+ resource_name
210+
+ " ("
211+
+ resource_version
212+
+ ") ",
213+
test=test,
214+
severity=severity,
215+
severity_justification=severity_justification,
216+
cwe=0,
217+
cvssv3=cvssv3,
218+
description=description.strip(),
219+
mitigation=fix_version,
220+
references=url,
221+
component_name=resource.get("name"),
222+
component_version=resource.get("version"),
223+
impact=severity,
224+
)
225+
if vulnerability_id != "No CVE":
226+
finding.unsaved_vulnerability_ids = [vulnerability_id]
227+
if vuln.get("epss_score"):
228+
finding.epss_score = vuln.get("epss_score")
229+
if vuln.get("epss_percentile"):
230+
finding.epss_percentile = vuln.get("epss_percentile")
231+
return finding
212232

213-
finding = Finding(
214-
title=vulnerability_id
215-
+ " - "
216-
+ resource_name
217-
+ " ("
218-
+ resource_version
219-
+ ") ",
220-
test=test,
221-
severity=severity,
222-
severity_justification=severity_justification,
223-
cwe=0,
224-
cvssv3=cvssv3,
225-
description=description.strip(),
226-
mitigation=fix_version,
227-
references=url,
228-
component_name=resource.get("name"),
229-
component_version=resource.get("version"),
230-
impact=severity,
231-
)
232-
if vulnerability_id != "No CVE":
233-
finding.unsaved_vulnerability_ids = [vulnerability_id]
234-
if vuln.get("epss_score"):
235-
finding.epss_score = vuln.get("epss_score")
236-
if vuln.get("epss_percentile"):
237-
finding.epss_percentile = vuln.get("epss_percentile")
238-
return finding
239-
240-
241-
def get_item_v2(item, test):
242-
vulnerability_id = item["name"]
243-
file_path = item["file"]
244-
url = item.get("url")
245-
severity = severity_of(float(item["score"]))
246-
description = item.get("description")
247-
solution = item.get("solution")
248-
fix_version = item.get("fix_version")
249-
if solution:
250-
mitigation = solution
251-
elif fix_version:
252-
mitigation = "Upgrade to " + str(fix_version)
253-
else:
254-
mitigation = "No known mitigation"
255-
256-
finding = Finding(
257-
title=str(vulnerability_id) + ": " + str(file_path),
258-
description=description,
259-
url=url,
260-
cwe=0,
261-
test=test,
262-
severity=severity,
263-
impact=severity,
264-
mitigation=mitigation,
265-
)
266-
finding.unsaved_vulnerability_ids = [vulnerability_id]
267-
268-
return finding
269-
270-
271-
def get_item_sensitive_data(resource, sensitive_item, test):
272-
resource_name = resource.get("name", "None")
273-
resource_path = resource.get("path", "None")
274-
vulnerability_id = resource_name
275-
description = "**Senstive Item:** " + sensitive_item + "\n"
276-
description += "**Layer:** " + resource.get("layer", "None") + "\n"
277-
description += "**Layer_Digest:** " + resource.get("layer_digest", "None") + "\n"
278-
description += "**Path:** " + resource.get("path", "None") + "\n"
279-
finding = Finding(
280-
title=vulnerability_id
281-
+ " - "
282-
+ resource_name
283-
+ " ("
284-
+ resource_path
285-
+ ") ",
286-
test=test,
287-
severity="Info",
288-
description=description.strip(),
289-
component_name=resource.get("name"),
290-
)
291-
if vulnerability_id != "No CVE":
233+
def get_item_v2(self, item, test):
234+
vulnerability_id = item["name"]
235+
file_path = item["file"]
236+
url = item.get("url")
237+
severity = self.severity_of(float(item["score"]))
238+
description = item.get("description")
239+
solution = item.get("solution")
240+
fix_version = item.get("fix_version")
241+
if solution:
242+
mitigation = solution
243+
elif fix_version:
244+
mitigation = "Upgrade to " + str(fix_version)
245+
else:
246+
mitigation = "No known mitigation"
247+
finding = Finding(
248+
title=str(vulnerability_id) + ": " + str(file_path),
249+
description=description,
250+
url=url,
251+
cwe=0,
252+
test=test,
253+
severity=severity,
254+
impact=severity,
255+
mitigation=mitigation,
256+
)
292257
finding.unsaved_vulnerability_ids = [vulnerability_id]
258+
return finding
293259

294-
return finding
295-
296-
297-
def aqua_severity_of(score):
298-
if score == "high":
299-
return "High"
300-
if score == "medium":
301-
return "Medium"
302-
if score == "low":
303-
return "Low"
304-
if score == "negligible":
305-
return "Info"
306-
return "Critical"
307-
260+
def get_item_sensitive_data(self, resource, sensitive_item, test):
261+
resource_name = resource.get("name", "None")
262+
resource_path = resource.get("path", "None")
263+
vulnerability_id = resource_name
264+
description = "**Senstive Item:** " + sensitive_item + "\n"
265+
description += "**Layer:** " + resource.get("layer", "None") + "\n"
266+
description += "**Layer_Digest:** " + resource.get("layer_digest", "None") + "\n"
267+
description += "**Path:** " + resource.get("path", "None") + "\n"
268+
finding = Finding(
269+
title=vulnerability_id
270+
+ " - "
271+
+ resource_name
272+
+ " ("
273+
+ resource_path
274+
+ ") ",
275+
test=test,
276+
severity="Info",
277+
description=description.strip(),
278+
component_name=resource.get("name"),
279+
)
280+
if vulnerability_id != "No CVE":
281+
finding.unsaved_vulnerability_ids = [vulnerability_id]
282+
return finding
308283

309-
def severity_of(score):
310-
if score == 0:
311-
return "Info"
312-
if score < 4:
313-
return "Low"
314-
if 4.0 < score < 7.0:
315-
return "Medium"
316-
if 7.0 < score < 9.0:
317-
return "High"
318-
return "Critical"
284+
def severity_of(self, score):
285+
if isinstance(score, str):
286+
if score == "high":
287+
return "High"
288+
if score == "medium":
289+
return "Medium"
290+
if score == "low":
291+
return "Low"
292+
if score == "negligible":
293+
return "Info"
294+
return "Critical"
295+
if score == 0:
296+
return "Info"
297+
if score < 4:
298+
return "Low"
299+
if 4.0 < score < 7.0:
300+
return "Medium"
301+
if 7.0 < score < 9.0:
302+
return "High"
303+
return "Critical"

unittests/tools/test_aqua_parser.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,16 @@ def test_aqua_parser_has_one_finding(self):
2828
self.assertEqual("1.1.20-r4", finding.component_version)
2929
self.assertEqual(1, len(finding.unsaved_vulnerability_ids))
3030
self.assertEqual("CVE-2019-14697", finding.unsaved_vulnerability_ids[0])
31+
finding_severity_justification = """
32+
Aqua severity classification: None
33+
Aqua scoring system: CVSS V2
34+
Aqua score: 7.5
35+
Vendor score: 7.5
36+
NVD v3 vectors: CVSS:3.0/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H
37+
NVD v2 vectors: AV:N/AC:L/Au:N/C:P/I:P/A:P
38+
Aqua severity (high) used for classification.
39+
"""
40+
self.assertEqual(finding_severity_justification, finding.severity_justification)
3141

3242
def test_aqua_parser_has_many_findings(self):
3343
with open(get_unit_tests_scans_path("aqua") / "many_vulns.json", encoding="utf-8") as testfile:

0 commit comments

Comments
 (0)