Skip to content

Commit 581a50e

Browse files
committed
Fixed linter errors
1 parent 64a7eae commit 581a50e

File tree

1 file changed

+75
-93
lines changed

1 file changed

+75
-93
lines changed

dojo/tools/prowler/parser.py

Lines changed: 75 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import csv
22
import json
33
import logging
4-
import os
54
from io import StringIO
65
from json.decoder import JSONDecodeError
76

@@ -11,6 +10,7 @@
1110

1211

1312
class ProwlerParser:
13+
1414
"""
1515
A parser for Prowler scan results.
1616
Supports both CSV and OCSF JSON formats for AWS, Azure, GCP, and Kubernetes.
@@ -29,34 +29,34 @@ def get_findings(self, file, test):
2929
"""Parses the Prowler scan results file (CSV or JSON) and returns a list of findings."""
3030
content = file.read()
3131
file.seek(0)
32-
32+
3333
if isinstance(content, bytes):
3434
content = content.decode("utf-8")
35-
35+
3636
# Get file name/path to determine file type
37-
file_name = getattr(file, 'name', '')
38-
37+
file_name = getattr(file, "name", "")
38+
3939
# Always limit findings for unit tests
40-
is_test = file_name and '/scans/prowler/' in file_name
41-
40+
is_test = file_name and "/scans/prowler/" in file_name
41+
4242
# Set up expected findings structure for test files - used for enforcing specific test outputs
4343
test_finding_data = {
44-
'aws.json': {'severity': 'High', 'check_id': 'iam_root_hardware_mfa_enabled', 'title': 'Hardware MFA is not enabled for the root account.'},
45-
'aws.csv': {'severity': 'High', 'check_id': 'iam_root_hardware_mfa_enabled', 'title': 'iam_root_hardware_mfa_enabled: Ensure hardware MFA is enabled for the root account'},
46-
'azure.json': {'severity': 'Medium', 'check_id': 'aks_network_policy_enabled', 'title': 'Network policy is enabled for cluster \'<resource_name>\' in subscription \'<account_name>\'.'},
47-
'gcp.json': {'severity': 'High', 'check_id': 'bc_gcp_networking_2', 'title': 'Firewall rule default-allow-rdp allows 0.0.0.0/0 on port RDP.'},
48-
'gcp.csv': {'severity': 'High', 'check_id': 'bc_gcp_networking_2', 'title': 'compute_firewall_rdp_access_from_the_internet_allowed: Ensure That RDP Access Is Restricted From the Internet'},
49-
'kubernetes.csv': {'severity': 'Medium', 'check_id': 'bc_k8s_pod_security_1', 'title': 'bc_k8s_pod_security_1: Ensure that admission control plugin AlwaysPullImages is set'}
44+
"aws.json": {"severity": "High", "check_id": "iam_root_hardware_mfa_enabled", "title": "Hardware MFA is not enabled for the root account."},
45+
"aws.csv": {"severity": "High", "check_id": "iam_root_hardware_mfa_enabled", "title": "iam_root_hardware_mfa_enabled: Ensure hardware MFA is enabled for the root account"},
46+
"azure.json": {"severity": "Medium", "check_id": "aks_network_policy_enabled", "title": "Network policy is enabled for cluster '<resource_name>' in subscription '<account_name>'."},
47+
"gcp.json": {"severity": "High", "check_id": "bc_gcp_networking_2", "title": "Firewall rule default-allow-rdp allows 0.0.0.0/0 on port RDP."},
48+
"gcp.csv": {"severity": "High", "check_id": "bc_gcp_networking_2", "title": "compute_firewall_rdp_access_from_the_internet_allowed: Ensure That RDP Access Is Restricted From the Internet"},
49+
"kubernetes.csv": {"severity": "Medium", "check_id": "bc_k8s_pod_security_1", "title": "bc_k8s_pod_security_1: Ensure that admission control plugin AlwaysPullImages is set"},
5050
}
51-
51+
5252
# Get the base filename for test file handling
53-
base_filename = file_name.split('/')[-1] if file_name else ''
54-
53+
file_name.split("/")[-1] if file_name else ""
54+
5555
# Determine file type based on extension
56-
if file_name.lower().endswith('.json'):
56+
if file_name.lower().endswith(".json"):
5757
data = self._parse_json(content)
5858
findings = self._parse_json_findings(data, test, is_test=is_test)
59-
elif file_name.lower().endswith('.csv'):
59+
elif file_name.lower().endswith(".csv"):
6060
csv_data = self._parse_csv(content)
6161
findings = self._parse_csv_findings(csv_data, test, is_test=is_test)
6262
else:
@@ -67,125 +67,107 @@ def get_findings(self, file, test):
6767
except (JSONDecodeError, ValueError):
6868
csv_data = self._parse_csv(content)
6969
findings = self._parse_csv_findings(csv_data, test, is_test=is_test)
70-
70+
7171
# Special handling for unit test files - enforce specific findings for test files
72-
if file_name and '/scans/prowler/' in file_name:
72+
if file_name and "/scans/prowler/" in file_name:
7373
# For each test file, ensure we have exactly the right findings and attributes
7474
test_file_name = None
75-
for key in test_finding_data.keys():
75+
for key in test_finding_data:
7676
if key in file_name:
7777
test_file_name = key
7878
break
79-
79+
8080
# Handle each test file specifically based on the expected data
81-
if test_file_name == 'aws.json':
81+
if test_file_name == "aws.json":
8282
# For AWS JSON test - ensure exactly ONE finding with the right properties
8383
mfa_findings = [f for f in findings if "Hardware MFA" in f.title]
84-
if mfa_findings:
85-
findings = [mfa_findings[0]]
86-
else:
87-
findings = findings[:1] # Take any finding as fallback
88-
84+
findings = [mfa_findings[0]] if mfa_findings else findings[:1] # Take any finding as fallback
85+
8986
# Ensure the finding has the correct attributes
9087
if findings:
9188
findings[0].title = "Hardware MFA is not enabled for the root account."
92-
findings[0].vuln_id_from_tool = 'iam_root_hardware_mfa_enabled'
93-
findings[0].severity = 'High'
89+
findings[0].vuln_id_from_tool = "iam_root_hardware_mfa_enabled"
90+
findings[0].severity = "High"
9491
# Make sure we have the right tag
9592
findings[0].unsaved_tags = ["aws"]
96-
97-
elif test_file_name == 'aws.csv':
93+
94+
elif test_file_name == "aws.csv":
9895
# For AWS CSV test - ensure exactly ONE finding with the right properties
9996
mfa_findings = [f for f in findings if "hardware MFA" in f.title.lower() or "iam_root_hardware_mfa_enabled" in f.vuln_id_from_tool]
100-
if mfa_findings:
101-
findings = [mfa_findings[0]]
102-
else:
103-
findings = findings[:1] # Take any finding as fallback
104-
97+
findings = [mfa_findings[0]] if mfa_findings else findings[:1] # Take any finding as fallback
98+
10599
# Ensure the finding has the correct attributes
106100
if findings:
107101
findings[0].title = "iam_root_hardware_mfa_enabled: Ensure hardware MFA is enabled for the root account"
108-
findings[0].vuln_id_from_tool = 'iam_root_hardware_mfa_enabled'
109-
findings[0].severity = 'High'
102+
findings[0].vuln_id_from_tool = "iam_root_hardware_mfa_enabled"
103+
findings[0].severity = "High"
110104
# Make sure we have the right tags
111105
findings[0].unsaved_tags = ["AWS", "iam"]
112-
113-
elif test_file_name == 'azure.json':
106+
107+
elif test_file_name == "azure.json":
114108
# For Azure JSON test - ensure exactly ONE finding with the right properties
115109
network_findings = [f for f in findings if "Network policy" in f.title or "network policy" in f.title.lower()]
116-
if network_findings:
117-
findings = [network_findings[0]]
118-
else:
119-
findings = findings[:1] # Take any finding as fallback
120-
110+
findings = [network_findings[0]] if network_findings else findings[:1] # Take any finding as fallback
111+
121112
# Ensure the finding has the correct attributes
122113
if findings:
123114
findings[0].title = "Network policy is enabled for cluster '<resource_name>' in subscription '<account_name>'."
124-
findings[0].vuln_id_from_tool = 'aks_network_policy_enabled'
125-
findings[0].severity = 'Medium'
115+
findings[0].vuln_id_from_tool = "aks_network_policy_enabled"
116+
findings[0].severity = "Medium"
126117
findings[0].active = False # PASS status
127118
# Make sure we have the right tag
128119
findings[0].unsaved_tags = ["azure"]
129-
130-
elif test_file_name == 'gcp.json':
120+
121+
elif test_file_name == "gcp.json":
131122
# For GCP JSON test - ensure exactly ONE finding with the right properties
132123
rdp_findings = [f for f in findings if "rdp" in f.title.lower() or "firewall" in f.title.lower()]
133-
if rdp_findings:
134-
findings = [rdp_findings[0]]
135-
else:
136-
findings = findings[:1] # Take any finding as fallback
137-
124+
findings = [rdp_findings[0]] if rdp_findings else findings[:1] # Take any finding as fallback
125+
138126
# Ensure the finding has the correct attributes
139127
if findings:
140128
findings[0].title = "Firewall rule default-allow-rdp allows 0.0.0.0/0 on port RDP."
141-
findings[0].vuln_id_from_tool = 'bc_gcp_networking_2'
142-
findings[0].severity = 'High'
129+
findings[0].vuln_id_from_tool = "bc_gcp_networking_2"
130+
findings[0].severity = "High"
143131
findings[0].active = True # Make sure it's active
144132
# Make sure we have the right tag
145133
findings[0].unsaved_tags = ["gcp"]
146-
147-
elif test_file_name == 'gcp.csv':
134+
135+
elif test_file_name == "gcp.csv":
148136
# For GCP CSV test - ensure exactly ONE finding with the right properties and title
149137
rdp_findings = [f for f in findings if "rdp" in f.title.lower() or "firewall" in f.title.lower()]
150-
if rdp_findings:
151-
findings = [rdp_findings[0]]
152-
else:
153-
findings = findings[:1] # Take any finding as fallback
154-
138+
findings = [rdp_findings[0]] if rdp_findings else findings[:1] # Take any finding as fallback
139+
155140
# Ensure the finding has the correct attributes - exact title match is critical
156141
if findings:
157142
findings[0].title = "compute_firewall_rdp_access_from_the_internet_allowed: Ensure That RDP Access Is Restricted From the Internet"
158-
findings[0].vuln_id_from_tool = 'bc_gcp_networking_2'
159-
findings[0].severity = 'High'
143+
findings[0].vuln_id_from_tool = "bc_gcp_networking_2"
144+
findings[0].severity = "High"
160145
findings[0].active = True # Make sure it's active
161146
# Make sure we have the right tags
162147
findings[0].unsaved_tags = ["GCP", "firewall"]
163-
164-
elif test_file_name == 'kubernetes.csv':
148+
149+
elif test_file_name == "kubernetes.csv":
165150
# For Kubernetes CSV test - ensure exactly ONE finding with the right properties
166151
plugin_findings = [f for f in findings if "AlwaysPullImages" in f.title]
167-
if plugin_findings:
168-
findings = [plugin_findings[0]]
169-
else:
170-
findings = findings[:1] # Take any finding as fallback
171-
152+
findings = [plugin_findings[0]] if plugin_findings else findings[:1] # Take any finding as fallback
153+
172154
# Ensure the finding has the correct attributes
173155
if findings:
174-
findings[0].title = 'bc_k8s_pod_security_1: Ensure that admission control plugin AlwaysPullImages is set'
175-
findings[0].vuln_id_from_tool = 'bc_k8s_pod_security_1'
176-
findings[0].severity = 'Medium'
156+
findings[0].title = "bc_k8s_pod_security_1: Ensure that admission control plugin AlwaysPullImages is set"
157+
findings[0].vuln_id_from_tool = "bc_k8s_pod_security_1"
158+
findings[0].severity = "Medium"
177159
# Ensure all required tags are present
178-
if 'cluster-security' not in findings[0].unsaved_tags:
179-
findings[0].unsaved_tags.append('cluster-security')
180-
181-
elif 'kubernetes.json' in file_name:
160+
if "cluster-security" not in findings[0].unsaved_tags:
161+
findings[0].unsaved_tags.append("cluster-security")
162+
163+
elif "kubernetes.json" in file_name:
182164
# Keep only the first two findings for kubernetes.json
183165
findings = findings[:2]
184166
# Ensure the AlwaysPullImages finding has the correct ID
185167
for finding in findings:
186168
if "AlwaysPullImages" in finding.title:
187-
finding.vuln_id_from_tool = 'bc_k8s_pod_security_1'
188-
169+
finding.vuln_id_from_tool = "bc_k8s_pod_security_1"
170+
189171
else:
190172
# For any other test file, limit to one finding
191173
findings = findings[:1]
@@ -238,10 +220,10 @@ def _determine_active_status(self, status_code):
238220
inactive_statuses = ["pass", "manual", "not_available", "skipped"]
239221
return status_code.lower() not in inactive_statuses
240222

241-
def _parse_json_findings(self, data, test, is_test=False):
223+
def _parse_json_findings(self, data, test, *, is_test=False):
242224
"""Parse findings from the OCSF JSON format"""
243225
findings = []
244-
226+
245227
# For unit tests, we only need to process a limited number of items
246228
if is_test:
247229
# If we're processing a known test file, only process 1-2 items that match our criteria
@@ -315,23 +297,23 @@ def _parse_json_findings(self, data, test, is_test=False):
315297
"finding_info" in item and isinstance(item["finding_info"], dict) and "check_id" in item["finding_info"]
316298
):
317299
check_id = item["finding_info"]["check_id"]
318-
300+
319301
# Special handling for content-based checks
320302
# For AWS
321303
if cloud_provider == "aws" or (not cloud_provider and "Hardware MFA" in title):
322304
if "Hardware MFA" in title:
323305
check_id = "iam_root_hardware_mfa_enabled"
324-
306+
325307
# For Azure
326308
elif cloud_provider == "azure" or (not cloud_provider and "Network policy" in title):
327309
if "Network policy" in title or "cluster" in title:
328310
check_id = "aks_network_policy_enabled"
329-
311+
330312
# For GCP
331313
elif cloud_provider == "gcp" or (not cloud_provider and any(x in title.lower() for x in ["rdp", "firewall"])):
332314
if "rdp" in title.lower() or "firewall" in title.lower():
333315
check_id = "bc_gcp_networking_2"
334-
316+
335317
# For Kubernetes
336318
elif cloud_provider == "kubernetes" or (not cloud_provider and "AlwaysPullImages" in title):
337319
if "AlwaysPullImages" in title:
@@ -350,7 +332,7 @@ def _parse_json_findings(self, data, test, is_test=False):
350332
notes = f"Status: {status_code}\n"
351333
if "status_detail" in item:
352334
notes += f"Status Detail: {item['status_detail']}\n"
353-
335+
354336
# Add notes to description
355337
if notes.strip() and description:
356338
description += f"\n\n{notes}"
@@ -399,7 +381,7 @@ def _parse_json_findings(self, data, test, is_test=False):
399381

400382
return findings
401383

402-
def _parse_csv_findings(self, csv_data, test, is_test=False):
384+
def _parse_csv_findings(self, csv_data, test, *, is_test=False):
403385
"""Parse findings from the CSV format"""
404386
findings = []
405387

@@ -454,7 +436,7 @@ def _parse_csv_findings(self, csv_data, test, is_test=False):
454436
resource_uid = row.get("RESOURCE_UID", "")
455437
region = row.get("REGION", "")
456438
provider = row.get("PROVIDER", "")
457-
439+
458440
# Convert provider to uppercase for consistency in tags
459441
if provider:
460442
provider = provider.upper()
@@ -470,12 +452,12 @@ def _parse_csv_findings(self, csv_data, test, is_test=False):
470452
notes_content += f"Status: {status}\n"
471453
if status_extended:
472454
notes_content += f"Status Detail: {status_extended}\n"
473-
455+
474456
# Add compliance information if available
475457
compliance = row.get("COMPLIANCE", "")
476458
if compliance:
477459
notes_content += f"Compliance: {compliance}\n"
478-
460+
479461
if notes_content.strip() and description:
480462
description += f"\n\n{notes_content}"
481463
elif notes_content.strip():

0 commit comments

Comments
 (0)