10
10
11
11
12
12
class ProwlerParser :
13
-
14
13
"""
15
14
A parser for Prowler scan results.
16
15
Supports both CSV and OCSF JSON formats for AWS, Azure, GCP, and Kubernetes.
@@ -36,9 +35,6 @@ def get_findings(self, file, test):
36
35
# Get file name/path to determine file type
37
36
file_name = getattr (file , "name" , "" )
38
37
39
- # Special handling for test files
40
- is_test = file_name and "/scans/prowler/" in file_name
41
-
42
38
# Determine file type based on extension
43
39
if file_name .lower ().endswith (".json" ):
44
40
data = self ._parse_json (content )
@@ -55,109 +51,6 @@ def get_findings(self, file, test):
55
51
csv_data = self ._parse_csv (content )
56
52
findings = self ._parse_csv_findings (csv_data , test , file_name = file_name )
57
53
58
- # Special handling for test files to ensure consistent test results
59
- if is_test :
60
- # Test files need specific output values
61
- if "aws.json" in file_name :
62
- # AWS JSON - get MFA finding or first finding
63
- mfa_findings = [f for f in findings if "Hardware MFA" in f .title ]
64
- if mfa_findings :
65
- findings = [mfa_findings [0 ]]
66
- findings [0 ].title = "Hardware MFA is not enabled for the root account."
67
- findings [0 ].vuln_id_from_tool = "iam_root_hardware_mfa_enabled"
68
- findings [0 ].severity = "High"
69
- findings [0 ].unsaved_tags = ["aws" ]
70
- elif findings :
71
- findings = [findings [0 ]]
72
-
73
- elif "aws.csv" in file_name :
74
- # AWS CSV - get MFA finding or first finding
75
- mfa_findings = [
76
- f
77
- for f in findings
78
- if "hardware MFA" in f .title .lower ()
79
- or "iam_root_hardware_mfa_enabled" in (f .vuln_id_from_tool or "" ).lower ()
80
- ]
81
- if mfa_findings :
82
- findings = [mfa_findings [0 ]]
83
- findings [
84
- 0
85
- ].title = "iam_root_hardware_mfa_enabled: Ensure hardware MFA is enabled for the root account"
86
- findings [0 ].vuln_id_from_tool = "iam_root_hardware_mfa_enabled"
87
- findings [0 ].severity = "High"
88
- findings [0 ].unsaved_tags = ["AWS" , "iam" ]
89
- elif findings :
90
- findings = [findings [0 ]]
91
-
92
- elif "azure.json" in file_name :
93
- # Azure JSON - ensure exactly ONE finding
94
- network_findings = [f for f in findings if "Network policy" in f .title ]
95
- if network_findings :
96
- findings = [network_findings [0 ]]
97
- findings [
98
- 0
99
- ].title = (
100
- "Network policy is enabled for cluster '<resource_name>' in subscription '<account_name>'."
101
- )
102
- findings [0 ].vuln_id_from_tool = "aks_network_policy_enabled"
103
- findings [0 ].severity = "Medium"
104
- findings [0 ].active = False
105
- findings [0 ].unsaved_tags = ["azure" ]
106
- elif findings :
107
- findings = [findings [0 ]]
108
-
109
- elif "gcp.json" in file_name :
110
- # GCP JSON - ensure RDP finding
111
- rdp_findings = [f for f in findings if "rdp" in f .title .lower () or "firewall" in f .title .lower ()]
112
- if rdp_findings :
113
- findings = [rdp_findings [0 ]]
114
- findings [0 ].title = "Firewall rule default-allow-rdp allows 0.0.0.0/0 on port RDP."
115
- findings [0 ].vuln_id_from_tool = "bc_gcp_networking_2"
116
- findings [0 ].severity = "High"
117
- findings [0 ].active = True
118
- findings [0 ].unsaved_tags = ["gcp" ]
119
- elif findings :
120
- findings = [findings [0 ]]
121
-
122
- elif "gcp.csv" in file_name :
123
- # GCP CSV - ensure RDP finding
124
- rdp_findings = [f for f in findings if "rdp" in f .title .lower () or "firewall" in f .title .lower ()]
125
- if rdp_findings :
126
- findings = [rdp_findings [0 ]]
127
- findings [0 ].title = "bc_gcp_networking_2: Ensure That RDP Access Is Restricted From the Internet"
128
- findings [0 ].vuln_id_from_tool = "bc_gcp_networking_2"
129
- findings [0 ].severity = "High"
130
- findings [0 ].active = True
131
- findings [0 ].unsaved_tags = ["GCP" , "firewall" ]
132
- elif findings :
133
- findings = [findings [0 ]]
134
-
135
- elif "kubernetes.csv" in file_name :
136
- # Kubernetes CSV - ensure AlwaysPullImages finding
137
- plugin_findings = [f for f in findings if "AlwaysPullImages" in f .title ]
138
- if plugin_findings :
139
- findings = [plugin_findings [0 ]]
140
- findings [
141
- 0
142
- ].title = "bc_k8s_pod_security_1: Ensure that admission control plugin AlwaysPullImages is set"
143
- findings [0 ].vuln_id_from_tool = "bc_k8s_pod_security_1"
144
- findings [0 ].severity = "Medium"
145
- if "cluster-security" not in findings [0 ].unsaved_tags :
146
- findings [0 ].unsaved_tags .append ("cluster-security" )
147
- elif findings :
148
- findings = [findings [0 ]]
149
-
150
- elif "kubernetes.json" in file_name :
151
- # Keep only the first two findings for kubernetes.json
152
- findings = findings [:2 ]
153
- # Update AlwaysPullImages finding ID
154
- for finding in findings :
155
- if "AlwaysPullImages" in finding .title :
156
- finding .vuln_id_from_tool = "bc_k8s_pod_security_1"
157
- elif findings :
158
- # Default - limit to one finding for any other test file
159
- findings = findings [:1 ]
160
-
161
54
return findings
162
55
163
56
def _parse_json (self , content ):
0 commit comments