@@ -12,7 +12,15 @@ class ProwlerParser:
12
12
13
13
"""
14
14
A parser for Prowler scan results.
15
- Supports both CSV and OCSF JSON formats for AWS, Azure, GCP, and Kubernetes.
15
+ Supports both CSV and OCSF JSON for # Construct title
16
+ if original_check_id and check_title:
17
+ title = f"{original_check_id}: {check_title}"
18
+ elif original_check_id:
19
+ title = original_check_id
20
+ elif check_title:
21
+ title = check_title
22
+ else:
23
+ title = "Prowler Finding"AWS, Azure, GCP, and Kubernetes.
16
24
"""
17
25
18
26
def get_scan_types (self ):
@@ -43,20 +51,19 @@ def get_findings(self, file, test):
43
51
csv_data = self ._parse_csv (content )
44
52
findings = self ._parse_csv_findings (csv_data , test , file_name = file_name )
45
53
else :
46
- # If file type can't be determined from extension, throw an error
47
- error_message = f"Unsupported file format. Prowler parser only supports JSON and CSV files. File name: { file_name } "
48
- raise ValueError (error_message )
54
+ # If file type can't be determined from extension
55
+ error_msg = f"Unsupported file format. Prowler parser only supports JSON and CSV files. File name: { file_name } "
56
+ logger .error (f"Unsupported file format for Prowler parser: { file_name } " )
57
+ raise ValueError (error_msg )
49
58
50
59
return findings
51
60
52
61
def _parse_json (self , content ):
53
62
"""Safely parse JSON content"""
54
- # Content is already decoded in get_findings method
55
63
return json .loads (content )
56
64
57
65
def _parse_csv (self , content ):
58
66
"""Parse CSV content"""
59
- # Content is already decoded in get_findings method
60
67
f = StringIO (content )
61
68
csv_reader = csv .DictReader (f , delimiter = ";" )
62
69
results = list (csv_reader )
@@ -89,7 +96,8 @@ def _determine_active_status(self, status_code):
89
96
if not status_code :
90
97
return True
91
98
92
- inactive_statuses = ["pass" , "manual" , "not_available" , "skipped" ]
99
+ # Using a set for O(1) lookup performance
100
+ inactive_statuses = {"pass" , "manual" , "not_available" , "skipped" }
93
101
return status_code .lower () not in inactive_statuses
94
102
95
103
def _parse_json_findings (self , data , test , * , file_name = "" ):
@@ -98,8 +106,11 @@ def _parse_json_findings(self, data, test, *, file_name=""):
98
106
99
107
for item in data :
100
108
# Skip items without required fields
101
- if not isinstance (item , dict ) or "message" not in item :
102
- logger .debug (f"Skipping Prowler finding because it's not a dict or missing 'message' field: { item } " )
109
+ if not isinstance (item , dict ):
110
+ logger .debug (f"Skipping Prowler finding because it's not a dict: { item } " )
111
+ continue
112
+ if "message" not in item :
113
+ logger .debug (f"Skipping Prowler finding because it's missing 'message' field: { item } " )
103
114
continue
104
115
105
116
# Get basic information
@@ -157,47 +168,19 @@ def _parse_json_findings(self, data, test, *, file_name=""):
157
168
if "finding_info" in item and isinstance (item ["finding_info" ], dict ):
158
169
unique_id = item ["finding_info" ].get ("uid" , "" )
159
170
160
- # Extract check ID from various places
171
+ # Get check ID - simplify extraction logic
161
172
check_id = None
162
- if "check_id" in item :
173
+ if "finding_info" in item and isinstance (item ["finding_info" ], dict ):
174
+ check_id = item ["finding_info" ].get ("check_id" )
175
+ # Fall back to top-level check_id if not found in finding_info
176
+ if not check_id and "check_id" in item :
163
177
check_id = item .get ("check_id" )
164
- elif (
165
- "finding_info" in item and isinstance (item ["finding_info" ], dict ) and "check_id" in item ["finding_info" ]
166
- ):
167
- check_id = item ["finding_info" ]["check_id" ]
168
-
169
- # Map certain titles or contents to standardized check IDs
170
- # This helps with consistency across different formats
171
-
172
- # For AWS
173
- if cloud_provider == "aws" or (not cloud_provider and "Hardware MFA" in title ):
174
- if "Hardware MFA" in title or "hardware_mfa" in title .lower ():
175
- check_id = "iam_root_hardware_mfa_enabled"
176
-
177
- # For Azure
178
- elif cloud_provider == "azure" or (not cloud_provider and "Network policy" in title ):
179
- if "Network policy" in title or "network policy" in title .lower () or "cluster" in title :
180
- check_id = "aks_network_policy_enabled"
181
-
182
- # For GCP
183
- elif cloud_provider == "gcp" or (
184
- not cloud_provider and any (x in title .lower () for x in ["rdp" , "firewall" ])
185
- ):
186
- if "rdp" in title .lower () or "firewall" in title .lower ():
187
- check_id = "bc_gcp_networking_2"
188
-
189
- # For Kubernetes
190
- elif cloud_provider == "kubernetes" or (not cloud_provider and "AlwaysPullImages" in title ):
191
- if "AlwaysPullImages" in title :
192
- check_id = "bc_k8s_pod_security_1"
193
178
194
179
# Get remediation information
195
180
remediation = ""
196
181
if "remediation" in item and isinstance (item ["remediation" ], dict ):
197
- if "text" in item ["remediation" ]:
198
- remediation = item ["remediation" ]["text" ]
199
- elif "desc" in item ["remediation" ]:
200
- remediation = item ["remediation" ]["desc" ]
182
+ # Try to get remediation - prefer "text" field but fall back to "desc" if needed
183
+ remediation = item ["remediation" ].get ("text" , item ["remediation" ].get ("desc" , "" ))
201
184
202
185
# Add notes to description
203
186
if status_code :
@@ -227,6 +210,10 @@ def _parse_json_findings(self, data, test, *, file_name=""):
227
210
# Add additional metadata
228
211
finding .unsaved_tags = []
229
212
213
+ # Extract date if available
214
+ if "finding_info" in item and isinstance (item ["finding_info" ], dict ) and "created_time_dt" in item ["finding_info" ]:
215
+ finding .date = item ["finding_info" ]["created_time_dt" ]
216
+
230
217
# Add cloud provider as tag if available
231
218
if cloud_provider :
232
219
finding .unsaved_tags .append (cloud_provider )
@@ -287,7 +274,6 @@ def _parse_csv_findings(self, csv_data, test, *, file_name=""):
287
274
check_id = row .get ("CHECK_ID" , "" )
288
275
check_title = row .get ("CHECK_TITLE" , "" )
289
276
provider = row .get ("PROVIDER" , "" ).lower ()
290
- service_name = row .get ("SERVICE_NAME" , "" )
291
277
292
278
# Original check ID before any standardization (for titles)
293
279
original_check_id = check_id
@@ -306,9 +292,9 @@ def _parse_csv_findings(self, csv_data, test, *, file_name=""):
306
292
307
293
# Construct title
308
294
if original_check_id and check_title :
309
- title = f"{ original_check_id } : { check_title } "
310
- elif original_check_id :
311
- title = original_check_id
295
+ title = f"{ check_id } : { check_title } "
296
+ elif check_id :
297
+ title = check_id
312
298
elif check_title :
313
299
title = check_title
314
300
else :
@@ -382,6 +368,13 @@ def _parse_csv_findings(self, csv_data, test, *, file_name=""):
382
368
383
369
# Add provider as tag if available
384
370
finding .unsaved_tags = []
371
+
372
+ # Extract date if available
373
+ if row .get ("TIMESTAMP" , "" ):
374
+ finding .date = row .get ("TIMESTAMP" )
375
+ elif row .get ("ASSESSMENT_START_TIME" , "" ):
376
+ finding .date = row .get ("ASSESSMENT_START_TIME" )
377
+
385
378
if provider :
386
379
finding .unsaved_tags .append (provider )
387
380
# If no provider in the CSV but we can infer it from check_id or title
0 commit comments