Skip to content

🐛 Implement Wazuh v4.8 #12739

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: bugfix
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 7 additions & 69 deletions dojo/tools/wazuh/parser.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import hashlib
import json

from dojo.models import Endpoint, Finding
from dojo.tools.wazuh.v4_7 import WazuhV4_7
from dojo.tools.wazuh.v4_8 import WazuhV4_8


class WazuhParser:
Expand All @@ -22,74 +22,12 @@ def get_description_for_scan_types(self, scan_type):

def get_findings(self, file, test):
data = json.load(file)

if not data:
return []

# Detect duplications
dupes = {}

# Loop through each element in the list
vulnerabilities = data.get("data", {}).get("affected_items", [])
for item in vulnerabilities:
if (
item["condition"] != "Package unfixed"
and item["severity"] != "Untriaged"
):
cve = item.get("cve")
package_name = item.get("name")
package_version = item.get("version")
description = item.get("condition")
severity = item.get("severity").capitalize()
agent_ip = item.get("agent_ip")
links = item.get("external_references")
cvssv3_score = item.get("cvss3_score")
publish_date = item.get("published")
agent_name = item.get("agent_name")
agent_ip = item.get("agent_ip")
detection_time = item.get("detection_time").split("T")[0]

references = "\n".join(links) if links else None

title = (
item.get("title") + " (version: " + package_version + ")"
)

if agent_name:
dupe_key = title + cve + agent_name + package_name + package_version
else:
dupe_key = title + cve + package_name + package_version
dupe_key = hashlib.sha256(dupe_key.encode("utf-8")).hexdigest()

if dupe_key in dupes:
find = dupes[dupe_key]
else:
dupes[dupe_key] = True

find = Finding(
title=title,
test=test,
description=description,
severity=severity,
references=references,
static_finding=True,
component_name=package_name,
component_version=package_version,
cvssv3_score=cvssv3_score,
publish_date=publish_date,
unique_id_from_tool=dupe_key,
date=detection_time,
)

# in some cases the agent_ip is not the perfect way on how to identify a host. Thus prefer the agent_name, if existant.
if agent_name:
find.unsaved_endpoints = [Endpoint(host=agent_name)]
elif agent_ip:
find.unsaved_endpoints = [Endpoint(host=agent_ip)]

if id:
find.unsaved_vulnerability_ids = cve

dupes[dupe_key] = find

return list(dupes.values())
if data.get("data"):
return WazuhV4_7().parse_findings(test, data)
if data.get("hits"):
return WazuhV4_8().parse_findings(test, data)
return []
70 changes: 70 additions & 0 deletions dojo/tools/wazuh/v4_7.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import hashlib

from dojo.models import Endpoint, Finding


class WazuhV4_7:
def parse_findings(self, test, data):
dupes = {}
vulnerabilities = data.get("data", {}).get("affected_items", [])
for item in vulnerabilities:
if (
item["condition"] != "Package unfixed"
and item["severity"] != "Untriaged"
):
cve = item.get("cve")
package_name = item.get("name")
package_version = item.get("version")
description = item.get("condition")
severity = item.get("severity").capitalize()
agent_ip = item.get("agent_ip")
links = item.get("external_references")
cvssv3_score = item.get("cvss3_score")
publish_date = item.get("published")
agent_name = item.get("agent_name")
agent_ip = item.get("agent_ip")
detection_time = item.get("detection_time").split("T")[0]

references = "\n".join(links) if links else None

title = (
item.get("title") + " (version: " + package_version + ")"
)

if agent_name:
dupe_key = title + cve + agent_name + package_name + package_version
else:
dupe_key = title + cve + package_name + package_version
dupe_key = hashlib.sha256(dupe_key.encode("utf-8")).hexdigest()

if dupe_key in dupes:
find = dupes[dupe_key]
else:
dupes[dupe_key] = True

find = Finding(
title=title,
test=test,
description=description,
severity=severity,
references=references,
static_finding=True,
component_name=package_name,
component_version=package_version,
cvssv3_score=cvssv3_score,
publish_date=publish_date,
unique_id_from_tool=dupe_key,
date=detection_time,
)

# in some cases the agent_ip is not the perfect way on how to identify a host. Thus prefer the agent_name, if existant.
if agent_name:
find.unsaved_endpoints = [Endpoint(host=agent_name)]
elif agent_ip:
find.unsaved_endpoints = [Endpoint(host=agent_ip)]

if id:
find.unsaved_vulnerability_ids = cve

dupes[dupe_key] = find
return list(dupes.values())
49 changes: 49 additions & 0 deletions dojo/tools/wazuh/v4_8.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import hashlib

from dojo.models import Finding


class WazuhV4_8:
def parse_findings(self, test, data):
dupes = {}
vulnerabilities = data.get("hits", {}).get("hits", [])
for item_source in vulnerabilities:
item = item_source.get("_source")
vuln = item.get("vulnerability")
cve = vuln.get("id")
description = vuln.get("description")
severity = vuln.get("severity")
cvssv3_score = vuln.get("score").get("base")
publish_date = vuln.get("published_at").split("T")[0]
agent_id = item.get("agent").get("id")
detection_time = vuln.get("detected_at").split("T")[0]

references = vuln.get("reference")

title = (
cve + " (agent_id: " + agent_id + ")"
)

dupe_key = title + agent_id + description
dupe_key = hashlib.sha256(dupe_key.encode("utf-8")).hexdigest()

if dupe_key in dupes:
find = dupes[dupe_key]
else:
dupes[dupe_key] = True

find = Finding(
title=title,
test=test,
description=description,
severity=severity,
references=references,
static_finding=True,
cvssv3_score=cvssv3_score,
publish_date=publish_date,
unique_id_from_tool=dupe_key,
date=detection_time,
)
find.unsaved_vulnerability_ids = cve
dupes[dupe_key] = find
return list(dupes.values())
Loading