Skip to content

Commit 99f7e41

Browse files
committed
🐛 Implement Wazuh v4.8
1 parent 269a75f commit 99f7e41

File tree

4 files changed

+773
-69
lines changed

4 files changed

+773
-69
lines changed

dojo/tools/wazuh/parser.py

Lines changed: 7 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import hashlib
21
import json
32

4-
from dojo.models import Endpoint, Finding
3+
from dojo.tools.wazuh.v4_7 import WazuhV4_7
4+
from dojo.tools.wazuh.v4_8 import WazuhV4_8
55

66

77
class WazuhParser:
@@ -22,74 +22,12 @@ def get_description_for_scan_types(self, scan_type):
2222

2323
def get_findings(self, file, test):
2424
data = json.load(file)
25-
2625
if not data:
2726
return []
2827

29-
# Detect duplications
30-
dupes = {}
31-
3228
# Loop through each element in the list
33-
vulnerabilities = data.get("data", {}).get("affected_items", [])
34-
for item in vulnerabilities:
35-
if (
36-
item["condition"] != "Package unfixed"
37-
and item["severity"] != "Untriaged"
38-
):
39-
cve = item.get("cve")
40-
package_name = item.get("name")
41-
package_version = item.get("version")
42-
description = item.get("condition")
43-
severity = item.get("severity").capitalize()
44-
agent_ip = item.get("agent_ip")
45-
links = item.get("external_references")
46-
cvssv3_score = item.get("cvss3_score")
47-
publish_date = item.get("published")
48-
agent_name = item.get("agent_name")
49-
agent_ip = item.get("agent_ip")
50-
detection_time = item.get("detection_time").split("T")[0]
51-
52-
references = "\n".join(links) if links else None
53-
54-
title = (
55-
item.get("title") + " (version: " + package_version + ")"
56-
)
57-
58-
if agent_name:
59-
dupe_key = title + cve + agent_name + package_name + package_version
60-
else:
61-
dupe_key = title + cve + package_name + package_version
62-
dupe_key = hashlib.sha256(dupe_key.encode("utf-8")).hexdigest()
63-
64-
if dupe_key in dupes:
65-
find = dupes[dupe_key]
66-
else:
67-
dupes[dupe_key] = True
68-
69-
find = Finding(
70-
title=title,
71-
test=test,
72-
description=description,
73-
severity=severity,
74-
references=references,
75-
static_finding=True,
76-
component_name=package_name,
77-
component_version=package_version,
78-
cvssv3_score=cvssv3_score,
79-
publish_date=publish_date,
80-
unique_id_from_tool=dupe_key,
81-
date=detection_time,
82-
)
83-
84-
# in some cases the agent_ip is not the perfect way on how to identify a host. Thus prefer the agent_name, if existant.
85-
if agent_name:
86-
find.unsaved_endpoints = [Endpoint(host=agent_name)]
87-
elif agent_ip:
88-
find.unsaved_endpoints = [Endpoint(host=agent_ip)]
89-
90-
if id:
91-
find.unsaved_vulnerability_ids = cve
92-
93-
dupes[dupe_key] = find
94-
95-
return list(dupes.values())
29+
if data.get("data"):
30+
return WazuhV4_7().parse_findings(test, data)
31+
if data.get("hits"):
32+
return WazuhV4_8().parse_findings(test, data)
33+
return []

dojo/tools/wazuh/v4_7.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import hashlib
2+
3+
from dojo.models import Endpoint, Finding
4+
5+
6+
class WazuhV4_7:
7+
def parse_findings(self, test, data):
8+
dupes = {}
9+
vulnerabilities = data.get("data", {}).get("affected_items", [])
10+
for item in vulnerabilities:
11+
if (
12+
item["condition"] != "Package unfixed"
13+
and item["severity"] != "Untriaged"
14+
):
15+
cve = item.get("cve")
16+
package_name = item.get("name")
17+
package_version = item.get("version")
18+
description = item.get("condition")
19+
severity = item.get("severity").capitalize()
20+
agent_ip = item.get("agent_ip")
21+
links = item.get("external_references")
22+
cvssv3_score = item.get("cvss3_score")
23+
publish_date = item.get("published")
24+
agent_name = item.get("agent_name")
25+
agent_ip = item.get("agent_ip")
26+
detection_time = item.get("detection_time").split("T")[0]
27+
28+
references = "\n".join(links) if links else None
29+
30+
title = (
31+
item.get("title") + " (version: " + package_version + ")"
32+
)
33+
34+
if agent_name:
35+
dupe_key = title + cve + agent_name + package_name + package_version
36+
else:
37+
dupe_key = title + cve + package_name + package_version
38+
dupe_key = hashlib.sha256(dupe_key.encode("utf-8")).hexdigest()
39+
40+
if dupe_key in dupes:
41+
find = dupes[dupe_key]
42+
else:
43+
dupes[dupe_key] = True
44+
45+
find = Finding(
46+
title=title,
47+
test=test,
48+
description=description,
49+
severity=severity,
50+
references=references,
51+
static_finding=True,
52+
component_name=package_name,
53+
component_version=package_version,
54+
cvssv3_score=cvssv3_score,
55+
publish_date=publish_date,
56+
unique_id_from_tool=dupe_key,
57+
date=detection_time,
58+
)
59+
60+
# in some cases the agent_ip is not the perfect way on how to identify a host. Thus prefer the agent_name, if existant.
61+
if agent_name:
62+
find.unsaved_endpoints = [Endpoint(host=agent_name)]
63+
elif agent_ip:
64+
find.unsaved_endpoints = [Endpoint(host=agent_ip)]
65+
66+
if id:
67+
find.unsaved_vulnerability_ids = cve
68+
69+
dupes[dupe_key] = find
70+
return list(dupes.values())

dojo/tools/wazuh/v4_8.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
2+
3+
class WazuhV4_8:
4+
def parse_findings(self, test, data):
5+
# dupes = {}
6+
# vulnerabilities = data.get("data", {}).get("affected_items", [])
7+
# for item in vulnerabilities:
8+
# if (
9+
# item["condition"] != "Package unfixed"
10+
# and item["severity"] != "Untriaged"
11+
# ):
12+
# cve = item.get("cve")
13+
# package_name = item.get("name")
14+
# package_version = item.get("version")
15+
# description = item.get("condition")
16+
# severity = item.get("severity").capitalize()
17+
# agent_ip = item.get("agent_ip")
18+
# links = item.get("external_references")
19+
# cvssv3_score = item.get("cvss3_score")
20+
# publish_date = item.get("published")
21+
# agent_name = item.get("agent_name")
22+
# agent_ip = item.get("agent_ip")
23+
# detection_time = item.get("detection_time").split("T")[0]
24+
25+
# references = "\n".join(links) if links else None
26+
27+
# title = (
28+
# item.get("title") + " (version: " + package_version + ")"
29+
# )
30+
31+
# if agent_name:
32+
# dupe_key = title + cve + agent_name + package_name + package_version
33+
# else:
34+
# dupe_key = title + cve + package_name + package_version
35+
# dupe_key = hashlib.sha256(dupe_key.encode("utf-8")).hexdigest()
36+
37+
# if dupe_key in dupes:
38+
# find = dupes[dupe_key]
39+
# else:
40+
# dupes[dupe_key] = True
41+
42+
# find = Finding(
43+
# title=title,
44+
# test=test,
45+
# description=description,
46+
# severity=severity,
47+
# references=references,
48+
# static_finding=True,
49+
# component_name=package_name,
50+
# component_version=package_version,
51+
# cvssv3_score=cvssv3_score,
52+
# publish_date=publish_date,
53+
# unique_id_from_tool=dupe_key,
54+
# date=detection_time,
55+
# )
56+
57+
# # in some cases the agent_ip is not the perfect way on how to identify a host. Thus prefer the agent_name, if existant.
58+
# if agent_name:
59+
# find.unsaved_endpoints = [Endpoint(host=agent_name)]
60+
# elif agent_ip:
61+
# find.unsaved_endpoints = [Endpoint(host=agent_ip)]
62+
63+
# if id:
64+
# find.unsaved_vulnerability_ids = cve
65+
66+
# dupes[dupe_key] = find
67+
return []

0 commit comments

Comments
 (0)