107
107
global MAX_RETRIES
108
108
global SLEEP
109
109
MAX_RETRIES = 30
110
- SLEEP = 5
110
+ SLEEP = 10
111
111
112
112
logging .basicConfig (
113
113
level = logging .INFO ,
114
114
format = "[%(asctime)s] {%(module)s:%(lineno)d} %(levelname)s - %(message)s"
115
115
)
116
116
117
+ logging .getLogger ("blackduck" ).setLevel (logging .CRITICAL )
118
+
117
119
# Validates BD project and version
118
120
# Inputs:
119
121
# projname - Name of project
@@ -187,17 +189,18 @@ def get_sbom_mime_type(filename):
187
189
return 'application/spdx'
188
190
return None
189
191
190
- def poll_notifications_for_success (cl , proj_version_url , summaries_url ):
191
- # We want to locate a notification for
192
- # VERSION_BOM_CODE_LOCATION_BOM_COMPUTED
193
- # matching our proj_version_url and our codelocation
192
+ # Poll for notification alerting us of successful BOM computation
193
+ #
194
+ # Inputs:
195
+ # cl_url - Code Loction URL to match
196
+ # proj_version_url - Project Version URL to match
197
+ # summaries_url - Summaries URL from codelocation
198
+ #
199
+ # Returns on success. Errors are fatal.
200
+ def poll_notifications_for_success (cl_url , proj_version_url , summaries_url ):
194
201
retries = MAX_RETRIES
195
202
sleep_time = SLEEP
196
203
197
- # current theory: if a scan happened and we matched NOTHING, we
198
- # aren't going to get a BOM_COMPUTED notification. so is there any type
199
- # of notif that we DO get?
200
-
201
204
params = {
202
205
'filter' : ["notificationType:VERSION_BOM_CODE_LOCATION_BOM_COMPUTED" ],
203
206
'sort' : ["createdAt: ASC" ]
@@ -207,30 +210,35 @@ def poll_notifications_for_success(cl, proj_version_url, summaries_url):
207
210
retries -= 1
208
211
for result in bd .get_items ("/api/notifications" , params = params ):
209
212
if 'projectVersion' not in result ['content' ]:
210
- # skip it (shouldn 't be possible due to the filter)
213
+ # Shouldn 't be possible due to the filter
211
214
continue
212
- # We're checking the entire list of notifications, but ours is
213
- # likely to be the first. Walking the whole list to make
214
- # sure we find an exact match.
215
+ # We're checking the entire list of notifications, but ours should
216
+ # be near the top.
215
217
if result ['content' ]['projectVersion' ] == proj_version_url and \
216
- result ['content' ]['codeLocation' ] == cl [ '_meta' ][ 'href' ] and \
218
+ result ['content' ]['codeLocation' ] == cl_url and \
217
219
result ['content' ]['scanSummary' ] == summaries_url :
218
220
print ("BOM calculation complete" )
219
221
return
220
222
221
223
print ("Waiting for BOM calculation to complete" )
222
224
time .sleep (sleep_time )
223
225
224
- logging .error (f"Failed to verify successful BOM computed in { retries * sleep_time } seconds" )
226
+ logging .error (f"Failed to verify successful BOM computed in { MAX_RETRIES * sleep_time } seconds" )
225
227
sys .exit (1 )
226
228
227
229
# Poll for successful scan of SBOM.
228
- # Input: Name of SBOM document (not the filename, the name defined inside the json body)
230
+ # Inputs:
231
+ #
232
+ # sbom_name: Name of SBOM document (not the filename, the name defined
233
+ # inside the json body)
234
+ # proj_version_url: Project version url
235
+ #
229
236
# Returns on success. Errors will result in fatal exit.
230
237
def poll_for_sbom_complete (sbom_name , proj_version_url ):
231
238
retries = MAX_RETRIES
232
239
sleep_time = SLEEP
233
240
matched_scan = False
241
+ cl_url = None
234
242
235
243
# Replace any spaces in the name with a dash to match BD
236
244
sbom_name = sbom_name .replace (' ' , '-' )
@@ -242,23 +250,29 @@ def poll_for_sbom_complete(sbom_name, proj_version_url):
242
250
}
243
251
cls = bd .get_resource ('codeLocations' , params = params )
244
252
for cl in cls :
253
+ if matched_scan :
254
+ break
245
255
# Force exact match of: spdx_doc_name + " spdx/sbom"
246
256
# BD appends the "spdx/sbom" string to the name.
247
257
if cl ['name' ] != sbom_name + " spdx/sbom" :
248
258
continue
249
259
250
260
matched_scan = True
261
+ cl_url = cl ['_meta' ]['href' ]
262
+
251
263
for link in (cl ['_meta' ]['links' ]):
252
264
# Locate the scans URL to check for status
253
265
if link ['rel' ] == "latest-scan" :
254
266
latest_url = link ['href' ]
255
267
break
256
268
257
- assert latest_url , "Failed to locate latest-scan reference"
258
269
if not matched_scan :
259
270
logging .error (f"No scan found for SBOM: { sbom_name } " )
260
271
sys .exit (1 )
261
272
273
+ assert latest_url , "Failed to locate latest-scan reference"
274
+ assert cl_url , "Failed to locate codelocation reference"
275
+
262
276
# Wait for scanState = SUCCESS
263
277
while (retries ):
264
278
json_data = bd .get_json (latest_url )
@@ -271,43 +285,40 @@ def poll_for_sbom_complete(sbom_name, proj_version_url):
271
285
sys .exit (1 )
272
286
else :
273
287
# Only other state should be "STARTED" -- keep polling
274
- print (f"Waiting for status success , currently: { json_data ['scanState' ]} " )
288
+ print (f"Waiting for scan completion , currently: { json_data ['scanState' ]} " )
275
289
time .sleep (sleep_time )
276
290
277
291
# If there were ZERO matches, there will never be a notification of
278
- # BOM import success. Short-circuit that check and treat this as success.
292
+ # BOM import success. Short-circuit the check and treat this as success.
279
293
if json_data ['matchCount' ] == 0 :
280
- print ("No KB matches in BOM , continuing..." )
294
+ print ("No BOM KB matches, continuing..." )
281
295
return
282
296
283
297
# Save the codelocation summaries_url
284
298
summaries_url = json_data ['_meta' ]['href' ]
285
299
286
- # Greedy match - extract the scan id out of the URL
287
- #scanid = re.findall(r'.*\/(.*)', json_data['_meta']['href'])
288
- # proj_Version_url/bom-status/scanid does NOT WORK
289
-
290
- # TODO this seems actually fairly pointless - it get stuck in UP_TO_DATE
300
+ # Check the bom-status endpoint for success
291
301
retries = MAX_RETRIES
292
302
while (retries ):
293
303
json_data = bd .get_json (proj_version_url + "/bom-status" )
294
304
retries -= 1
295
305
if json_data ['status' ] == "UP_TO_DATE" :
296
306
print ("BOM import complete" )
297
307
break
298
- elif json_data ['status' ] == "FAILURE" :
299
- logging .error (f"BOM Import failure: { json_data ['status' ]} " )
308
+ elif json_data ['status' ] == "UP_TO_DATE_WITH_ERRORS" or \
309
+ json_data ['status' ] == "PROCESSING_WITH_ERRORS" :
310
+ logging .error (f"BOM Import failure: status is { json_data ['status' ]} " )
300
311
sys .exit (1 )
301
312
else :
302
313
print (f"Waiting for BOM import completion, current status: { json_data ['status' ]} " )
303
314
time .sleep (sleep_time )
304
315
305
316
if retries == 0 :
306
- logging .error ("Failed to verify successful SBOM import in {retries * sleep_time} seconds" )
317
+ logging .error (f "Failed to verify successful SBOM import in { retries * sleep_time } seconds" )
307
318
sys .exit (1 )
308
319
309
320
# Finally check notifications
310
- poll_notifications_for_success (cl , proj_version_url , summaries_url )
321
+ poll_notifications_for_success (cl_url , proj_version_url , summaries_url )
311
322
312
323
# Any errors above already resulted in fatal exit
313
324
return
@@ -580,8 +591,7 @@ def main():
580
591
package_count = 0
581
592
cust_comp_count = 0
582
593
cust_ver_count = 0
583
- # Saving all encountered components by their name+version
584
- # Used for debugging repeated package data
594
+ # Used for tracking repeated package data
585
595
packages = {}
586
596
# Saved component data to write to file
587
597
comps_out = []
@@ -595,8 +605,10 @@ def main():
595
605
596
606
if package .name == "" :
597
607
# Strange case where the package name is empty. Skip it.
598
- logging .warning ("WARNING: package name empty, skipping" )
608
+ logging .warning ("WARNING: Skipping empty package name. Package info:" )
609
+ pprint (package )
599
610
continue
611
+
600
612
# Trim any odd leading/trailing space or newlines
601
613
package .name = package .name .strip ()
602
614
@@ -617,9 +629,8 @@ def main():
617
629
if package .external_references :
618
630
foundpurl = False
619
631
for ref in package .external_references :
620
- # There can be multiple extrefs - try to locate a purl
621
- # If there should happen to be multiple purls,
622
- # we only consider the first.
632
+ # There can be multiple extrefs; try to locate a purl.
633
+ # If there are multiple purls, use the first one.
623
634
if (ref .reference_type == "purl" ):
624
635
foundpurl = True
625
636
kb_match = find_comp_in_kb (ref .locator )
@@ -668,8 +679,6 @@ def main():
668
679
if kb_match :
669
680
print (f" WARNING: { matchname } { matchver } in KB but not in SBOM" )
670
681
add_to_sbom (proj_version_url , kb_match ['version' ])
671
- # TODO TEMP DEBUG TO CATCH THIS
672
- quit ()
673
682
# short-circuit the rest
674
683
continue
675
684
0 commit comments