Skip to content

🐛 Implement Wazuh v4.8 #12739

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: bugfix
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 7 additions & 69 deletions dojo/tools/wazuh/parser.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import hashlib
import json

from dojo.models import Endpoint, Finding
from dojo.tools.wazuh.v4_7 import WazuhV4_7
from dojo.tools.wazuh.v4_8 import WazuhV4_8


class WazuhParser:
Expand All @@ -22,74 +22,12 @@ def get_description_for_scan_types(self, scan_type):

def get_findings(self, file, test):
data = json.load(file)

if not data:
return []

# Detect duplications
dupes = {}

# Loop through each element in the list
vulnerabilities = data.get("data", {}).get("affected_items", [])
for item in vulnerabilities:
if (
item["condition"] != "Package unfixed"
and item["severity"] != "Untriaged"
):
cve = item.get("cve")
package_name = item.get("name")
package_version = item.get("version")
description = item.get("condition")
severity = item.get("severity").capitalize()
agent_ip = item.get("agent_ip")
links = item.get("external_references")
cvssv3_score = item.get("cvss3_score")
publish_date = item.get("published")
agent_name = item.get("agent_name")
agent_ip = item.get("agent_ip")
detection_time = item.get("detection_time").split("T")[0]

references = "\n".join(links) if links else None

title = (
item.get("title") + " (version: " + package_version + ")"
)

if agent_name:
dupe_key = title + cve + agent_name + package_name + package_version
else:
dupe_key = title + cve + package_name + package_version
dupe_key = hashlib.sha256(dupe_key.encode("utf-8")).hexdigest()

if dupe_key in dupes:
find = dupes[dupe_key]
else:
dupes[dupe_key] = True

find = Finding(
title=title,
test=test,
description=description,
severity=severity,
references=references,
static_finding=True,
component_name=package_name,
component_version=package_version,
cvssv3_score=cvssv3_score,
publish_date=publish_date,
unique_id_from_tool=dupe_key,
date=detection_time,
)

# in some cases the agent_ip is not the perfect way on how to identify a host. Thus prefer the agent_name, if existant.
if agent_name:
find.unsaved_endpoints = [Endpoint(host=agent_name)]
elif agent_ip:
find.unsaved_endpoints = [Endpoint(host=agent_ip)]

if id:
find.unsaved_vulnerability_ids = cve

dupes[dupe_key] = find

return list(dupes.values())
if data.get("data"):
return WazuhV4_7().parse_findings(test, data)
if data.get("hits"):
return WazuhV4_8().parse_findings(test, data)
return []
70 changes: 70 additions & 0 deletions dojo/tools/wazuh/v4_7.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import hashlib

from dojo.models import Endpoint, Finding


class WazuhV4_7:
def parse_findings(self, test, data):
dupes = {}
vulnerabilities = data.get("data", {}).get("affected_items", [])
for item in vulnerabilities:
if (
item["condition"] != "Package unfixed"
and item["severity"] != "Untriaged"
):
cve = item.get("cve")
package_name = item.get("name")
package_version = item.get("version")
description = item.get("condition")
severity = item.get("severity").capitalize()
agent_ip = item.get("agent_ip")
links = item.get("external_references")
cvssv3_score = item.get("cvss3_score")
publish_date = item.get("published")
agent_name = item.get("agent_name")
agent_ip = item.get("agent_ip")
detection_time = item.get("detection_time").split("T")[0]

references = "\n".join(links) if links else None

title = (
item.get("title") + " (version: " + package_version + ")"
)

if agent_name:
dupe_key = title + cve + agent_name + package_name + package_version
else:
dupe_key = title + cve + package_name + package_version
dupe_key = hashlib.sha256(dupe_key.encode("utf-8")).hexdigest()

if dupe_key in dupes:
find = dupes[dupe_key]
else:
dupes[dupe_key] = True

find = Finding(
title=title,
test=test,
description=description,
severity=severity,
references=references,
static_finding=True,
component_name=package_name,
component_version=package_version,
cvssv3_score=cvssv3_score,
publish_date=publish_date,
unique_id_from_tool=dupe_key,
date=detection_time,
)

# in some cases the agent_ip is not the perfect way on how to identify a host. Thus prefer the agent_name, if existant.
if agent_name:
find.unsaved_endpoints = [Endpoint(host=agent_name)]
elif agent_ip:
find.unsaved_endpoints = [Endpoint(host=agent_ip)]

if id:
find.unsaved_vulnerability_ids = cve

dupes[dupe_key] = find
return list(dupes.values())
49 changes: 49 additions & 0 deletions dojo/tools/wazuh/v4_8.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import hashlib

from dojo.models import Finding


class WazuhV4_8:
def parse_findings(self, test, data):
dupes = {}
vulnerabilities = data.get("hits", {}).get("hits", [])
for item_source in vulnerabilities:
item = item_source.get("_source")
vuln = item.get("vulnerability")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add package object too for get package_name and package_version fields.

cve = vuln.get("id")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my case, the vulnerability ID shows only as "C" instead of the full CVE.
The remaining part of the CVE string appears under "additional vulnerability IDs".
image

Example of the field:
"id": "CVE-2020-35527",

description = vuln.get("description")
severity = vuln.get("severity")
cvssv3_score = vuln.get("score").get("base")
publish_date = vuln.get("published_at").split("T")[0]
agent_id = item.get("agent").get("id")
detection_time = vuln.get("detected_at").split("T")[0]

references = vuln.get("reference")

title = (
cve + " (agent_id: " + agent_id + ")"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the title to include package_name and package_version, like the previous syntax CVE Affects package_name ( package_version ) on agent_id.

)

dupe_key = title + agent_id + description
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got some warnings on dedupes

uwsgi-1             | [07/Jul/2025 12:47:34] WARNING [dojo.specific-loggers.deduplication:2203] test_type name Wazuh and scan_type Wazuh not found in HASHCODE_FIELDS_PER_SCANNER
uwsgi-1             | [07/Jul/2025 12:47:34] WARNING [dojo.specific-loggers.deduplication:2203] test_type name Wazuh and scan_type Wazuh not found in HASHCODE_FIELDS_PER_SCANNER
uwsgi-1             | [07/Jul/2025 12:47:34] WARNING [dojo.specific-loggers.deduplication:2203] test_type name Wazuh and scan_type Wazuh not found in HASHCODE_FIELDS_PER_SCANNER
uwsgi-1             | [07/Jul/2025 12:47:34] WARNING [dojo.specific-loggers.deduplication:2203] test_type name Wazuh and scan_type Wazuh not found in HASHCODE_FIELDS_PER_SCANNER
uwsgi-1             | [07/Jul/2025 12:47:34] WARNING [dojo.specific-loggers.deduplication:2203] test_type name Wazuh and scan_type Wazuh not found in HASHCODE_FIELDS_PER_SCANNER
uwsgi-1             | [07/Jul/2025 12:47:34] WARNING [dojo.specific-loggers.deduplication:2203] test_type name Wazuh and scan_type Wazuh not found in HASHCODE_FIELDS_PER_SCANNER

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that the default dedupe fields are being used here

dupe_key = hashlib.sha256(dupe_key.encode("utf-8")).hexdigest()

if dupe_key in dupes:
find = dupes[dupe_key]
else:
dupes[dupe_key] = True

find = Finding(
title=title,
test=test,
description=description,
severity=severity,
references=references,
static_finding=True,
cvssv3_score=cvssv3_score,
publish_date=publish_date,
unique_id_from_tool=dupe_key,
date=detection_time,
)
find.unsaved_vulnerability_ids = cve
dupes[dupe_key] = find
return list(dupes.values())
Loading