Skip to content

Wazuh: Add separate parser 4.7 & 4.8 #12841

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions dojo/settings/settings.dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -1348,6 +1348,8 @@ def saml2_attrib_map_format(din):
"Qualys Hacker Guardian Scan": ["title", "severity", "description"],
"Cyberwatch scan (Galeax)": ["title", "description", "severity"],
"Cycognito Scan": ["title", "severity"],
"Wazuh =< 4.7 Scan": ["title", "description", "severity"],
"Wazuh >= 4.8 Scan": ["title", "description", "severity"],
}

# Override the hardcoded settings here via the env var
Expand Down Expand Up @@ -1413,12 +1415,13 @@ def saml2_attrib_map_format(din):
"Wpscan": True,
"Rusty Hog Scan": True,
"Codechecker Report native": True,
"Wazuh": True,
"Nuclei Scan": True,
"Threagile risks report": True,
"HCL AppScan on Cloud SAST XML": True,
"AWS Inspector2 Scan": True,
"Cyberwatch scan (Galeax)": True,
"Wazuh >= 4.8 Scan": True,
"Wazuh =< 4.7 Scan": True,
}

# List of fields that are known to be usable in hash_code computation)
Expand Down Expand Up @@ -1578,7 +1581,6 @@ def saml2_attrib_map_format(din):
"kube-bench Scan": DEDUPE_ALGO_HASH_CODE,
"Threagile risks report": DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL_OR_HASH_CODE,
"Humble Json Importer": DEDUPE_ALGO_HASH_CODE,
"Wazuh Scan": DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL,
"MSDefender Parser": DEDUPE_ALGO_HASH_CODE,
"HCLAppScan XML": DEDUPE_ALGO_HASH_CODE,
"HCL AppScan on Cloud SAST XML": DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL_OR_HASH_CODE,
Expand All @@ -1605,6 +1607,8 @@ def saml2_attrib_map_format(din):
"Red Hat Satellite": DEDUPE_ALGO_HASH_CODE,
"Qualys Hacker Guardian Scan": DEDUPE_ALGO_HASH_CODE,
"Cyberwatch scan (Galeax)": DEDUPE_ALGO_HASH_CODE,
"Wazuh >= 4.8 Scan": DEDUPE_ALGO_HASH_CODE,
"Wazuh =< 4.7 Scan": DEDUPE_ALGO_HASH_CODE,
}

# Override the hardcoded settings here via the env var
Expand Down
95 changes: 0 additions & 95 deletions dojo/tools/wazuh/parser.py

This file was deleted.

File renamed without changes.
105 changes: 105 additions & 0 deletions dojo/tools/wazuh_indexer/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import json
from datetime import datetime

from dojo.models import Endpoint, Finding


class WazuhIndexerParser:
def get_scan_types(self):
return ["Wazuh >= 4.8 Scan"]

def get_label_for_scan_types(self, scan_type):
return "Wazuh >= 4.8 Scan"

def get_description_for_scan_types(self, scan_type):
return "Wazuh Vulnerability Data >= 4.8 from indexer in JSON format. See the documentation for search a script to obtain a clear output."

def get_findings(self, file, test):
data = json.load(file)

if not data:
return []

findings = []

vulnerabilities = data.get("hits", {}).get("hits", [])
for item_source in vulnerabilities:

item = item_source.get("_source")

# Get all vulnerability data
vuln = item.get("vulnerability")

description = vuln.get("description")
cve = vuln.get("id")
published_date = datetime.fromisoformat(vuln["published_at"]).date()
references = vuln.get("reference")
severity = vuln.get("severity")
if severity not in {"Critical", "High", "Medium", "Low"}:
severity = "Info"

if vuln.get("score"):
cvss_score = vuln.get("score").get("base")
cvss_version = vuln.get("score").get("version")
cvss3 = cvss_version.split(".")[0]

# Agent is equal to the endpoint
agent = item.get("agent")

agent_id = agent.get("id")
agent_name = agent.get("name")
# agent_ip = agent.get("ip") Maybe... will introduce it in the news versions of Wazuh?

description = (
f"Agent Name/ID: {agent_name} / {agent_id}\n"
f"{description}"
)

# Package in Wazuh is equivalent to "component" in DD
package = item.get("package")

package_name = package.get("name")
package_version = package.get("version")
package_description = package.get("description")
# Only get this field on some Windows agents.
package_path = package.get("path", None)

# Get information about OS from agent.
# This will use for severity justification
info_os = item.get("host")
if info_os and info_os.get("os"):
name_os = info_os.get("os").get("full", "N/A")
kernel_os = info_os.get("os").get("kernel", "N/A")

title = f"{cve} Affects {package_name} (Version: {package_version})"
severity_justification = (
f"Severity: {severity}\n"
f"CVSS Score: {cvss_score}\n"
f"CVSS Version: {cvss_version}\n"
f"\nOS: {name_os}\n"
f"Kernel: {kernel_os}\n\n"
f"Package Name: {package_name}\n"
f"Package Description: {package_description}"
)

finding = Finding(
title=title,
test=test,
description=description,
severity_justification=severity_justification,
severity=severity,
references=references,
dynamic_finding=True,
static_finding=False,
component_name=package_name,
component_version=package_version,
file_path=package_path or None,
publish_date=published_date,
cvssv3_score=cvss_score if cvss3 == "3" else None,
)

finding.unsaved_vulnerability_ids = [cve]
finding.unsaved_endpoints = [Endpoint(host=agent_name)]
findings.append(finding)

return findings
Empty file.
87 changes: 87 additions & 0 deletions dojo/tools/wazuh_legacy/parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import json

from dojo.models import Endpoint, Finding


class WazuhLegacyParser:

"""
The vulnerabilities with condition "Package unfixed" are skipped because there is no fix out yet.
https://github.com/wazuh/wazuh/issues/14560
"""

"""
Parser used for the Wazuh Detector module used in older versions of 4.7 and below (before Vulnerability Detection refactor).
https://github.com/wazuh/wazuh/releases/tag/v4.8.0
"""

def get_scan_types(self):
return ["Wazuh =< 4.7 Scan"]

def get_label_for_scan_types(self, scan_type):
return "Wazuh =< 4.7 Scan"

def get_description_for_scan_types(self, scan_type):
return "Wazuh =< 4.7 Scan. See the documentation for search a script to obtain a clear output."

def get_findings(self, file, test):
data = json.load(file)

if not data:
return []

findings = []

# Loop through each element in the list
vulnerabilities = data.get("data", {}).get("affected_items", [])
for item in vulnerabilities:
if (
item["condition"] != "Package unfixed"
and item["severity"] != "Untriaged"
):
cve = item.get("cve")
package_name = item.get("name")
package_version = item.get("version")
description = item.get("condition")
severity = item.get("severity").capitalize()
agent_ip = item.get("agent_ip")
links = item.get("external_references")
cvssv3_score = item.get("cvss3_score")
publish_date = item.get("published")
agent_name = item.get("agent_name")
agent_ip = item.get("agent_ip")
detection_time = item.get("detection_time").split("T")[0]

references = "\n".join(links) if links else None

title = (
item.get("title") + " (version: " + package_version + ")"
)

find = Finding(
title=title,
test=test,
description=description,
severity=severity,
references=references,
dynamic_finding=True,
static_finding=False,
component_name=package_name,
component_version=package_version,
cvssv3_score=cvssv3_score,
publish_date=publish_date,
date=detection_time,
)

# in some cases the agent_ip is not the perfect way on how to identify a host. Thus prefer the agent_name, if existant.
if agent_name:
find.unsaved_endpoints = [Endpoint(host=agent_name)]
elif agent_ip:
find.unsaved_endpoints = [Endpoint(host=agent_ip)]

if cve:
find.unsaved_vulnerability_ids = [cve]

findings.append(find)

return findings
Loading
Loading