103
103
from spdx_tools .spdx .parser .error import SPDXParsingError
104
104
from spdx_tools .spdx .parser .parse_anything import parse_file
105
105
106
+ # Used when we are polling for successful upload and processing
107
+ global MAX_RETRIES
108
+ global SLEEP
109
+ MAX_RETRIES = 30
110
+ SLEEP = 5
111
+
106
112
logging .basicConfig (
107
113
level = logging .INFO ,
108
114
format = "[%(asctime)s] {%(module)s:%(lineno)d} %(levelname)s - %(message)s"
@@ -181,88 +187,60 @@ def get_sbom_mime_type(filename):
181
187
return 'application/spdx'
182
188
return None
183
189
184
- # Poll for successful scan of SBOM.
185
- # Input: Name of SBOM document (not the filename, the name defined inside the json body)
186
- # Returns on success. Errors will result in fatal exit.
187
- def poll_for_upload (sbom_name ):
188
- max_retries = 30
189
- sleep_time = 10
190
- matched_scan = False
190
+ def poll_notifications_for_success (cl , proj_version_url , summaries_url ):
191
+ # We want to locate a notification for
192
+ # VERSION_BOM_CODE_LOCATION_BOM_COMPUTED
193
+ # matching our proj_version_url and our codelocation
194
+ retries = MAX_RETRIES
195
+ sleep_time = SLEEP
191
196
192
- # Replace any spaces in the name with a dash to match BD
193
- sbom_name = sbom_name .replace (' ' , '-' )
197
+ # current theory: if a scan happened and we matched NOTHING, we
198
+ # aren't going to get a BOM_COMPUTED notification. so is there any type
199
+ # of notif that we DO get?
194
200
195
- # Search for the latest scan matching our SBOM
196
201
params = {
197
- 'q ' : [f"name: { sbom_name } " ],
198
- 'sort' : ["updatedAt : ASC" ]
202
+ 'filter ' : ["notificationType:VERSION_BOM_CODE_LOCATION_BOM_COMPUTED " ],
203
+ 'sort' : ["createdAt : ASC" ]
199
204
}
200
- cls = bd .get_resource ('codeLocations' , params = params )
201
- for cl in cls :
202
- # Force exact match of: spdx_doc_name + " spdx/sbom"
203
- # BD appends the "spdx/sbom" string to the name.
204
- if cl ['name' ] != sbom_name + " spdx/sbom" :
205
- continue
206
-
207
- matched_scan = True
208
- for link in (cl ['_meta' ]['links' ]):
209
- # Locate the scans URL to check for status
210
- if link ['rel' ] == "scans" :
211
- summaries_url = link ['href' ]
212
- break
213
-
214
- assert (summaries_url )
215
- params = {
216
- 'sort' : ["updatedAt: ASC" ]
217
- }
218
-
219
- while (max_retries ):
220
- max_retries -= 1
221
- for item in bd .get_items (summaries_url , params = params ):
222
- # Only checking the first item as it's the most recent
223
- if item ['scanState' ] == "SUCCESS" :
224
- print ("BOM upload complete" )
225
- return
226
- elif item ['scanState' ] == "FAILURE" :
227
- logging .error (f"SPDX Scan Failure: { item ['statusMessage' ]} " )
228
- sys .exit (1 )
229
- else :
230
- # Only other state should be "STARTED" -- keep polling
231
- print (f"Waiting for status success, currently: { item ['scanState' ]} " )
232
- time .sleep (sleep_time )
233
- # Break out of for loop so we always check the most recent
234
- break
235
-
236
- # Handle various errors that might happen
237
- if max_retries == 0 :
238
- logging .error ("Failed to verify successful SPDX Scan in {max_retries * sleep_time} seconds" )
239
- elif not matched_scan :
240
- logging .error (f"No scan found for SBOM: { sbom_name } " )
241
- else :
242
- logging .error (f"Unable to verify successful scan of SBOM: { sbom_name } " )
243
205
244
- # If we got this far, it's a fatal error.
206
+ while (retries ):
207
+ retries -= 1
208
+ for result in bd .get_items ("/api/notifications" , params = params ):
209
+ if 'projectVersion' not in result ['content' ]:
210
+ # skip it (shouldn't be possible due to the filter)
211
+ continue
212
+ # We're checking the entire list of notifications, but ours is
213
+ # likely to be the first. Walking the whole list to make
214
+ # sure we find an exact match.
215
+ if result ['content' ]['projectVersion' ] == proj_version_url and \
216
+ result ['content' ]['codeLocation' ] == cl ['_meta' ]['href' ] and \
217
+ result ['content' ]['scanSummary' ] == summaries_url :
218
+ print ("BOM calculation complete" )
219
+ return
220
+
221
+ print ("Waiting for BOM calculation to complete" )
222
+ time .sleep (sleep_time )
223
+
224
+ logging .error (f"Failed to verify successful BOM computed in { retries * sleep_time } seconds" )
245
225
sys .exit (1 )
246
226
247
- # Poll for successful scan of SBOM
248
- # Inputs:
249
- # sbom_name: Name of SBOM document (not the filename)
250
- # version: project version to check
227
+ # Poll for successful scan of SBOM.
228
+ # Input: Name of SBOM document (not the filename, the name defined inside the json body)
251
229
# Returns on success. Errors will result in fatal exit.
252
- def poll_for_sbom_scan (sbom_name , projver ):
253
- max_retries = 30
254
- sleep_time = 10
230
+ def poll_for_sbom_complete (sbom_name , proj_version_url ):
231
+ retries = MAX_RETRIES
232
+ sleep_time = SLEEP
255
233
matched_scan = False
256
234
257
235
# Replace any spaces in the name with a dash to match BD
258
236
sbom_name = sbom_name .replace (' ' , '-' )
259
237
260
- # Search for the latest scan matching our SBOM
238
+ # Search for the latest scan matching our SBOM name
261
239
params = {
262
240
'q' : [f"name:{ sbom_name } " ],
263
241
'sort' : ["updatedAt: ASC" ]
264
242
}
265
- cls = bd .get_resource ('codelocations' , projver , params = params )
243
+ cls = bd .get_resource ('codeLocations' , params = params )
266
244
for cl in cls :
267
245
# Force exact match of: spdx_doc_name + " spdx/sbom"
268
246
# BD appends the "spdx/sbom" string to the name.
@@ -272,42 +250,67 @@ def poll_for_sbom_scan(sbom_name, projver):
272
250
matched_scan = True
273
251
for link in (cl ['_meta' ]['links' ]):
274
252
# Locate the scans URL to check for status
275
- if link ['rel' ] == "scans " :
276
- summaries_url = link ['href' ]
253
+ if link ['rel' ] == "latest-scan " :
254
+ latest_url = link ['href' ]
277
255
break
278
256
279
- assert ( summaries_url )
280
- params = {
281
- 'sort' : [ "updatedAt: ASC" ]
282
- }
257
+ assert latest_url , "Failed to locate latest-scan reference"
258
+ if not matched_scan :
259
+ logging . error ( f"No scan found for SBOM: { sbom_name } " )
260
+ sys . exit ( 1 )
283
261
284
- while (max_retries ):
285
- max_retries -= 1
286
- for item in bd .get_items (summaries_url , params = params ):
287
- # Only checking the first item as it's the most recent
288
- if item ['scanState' ] == "SUCCESS" :
289
- print ("BOM scan complete" )
290
- return
291
- elif item ['scanState' ] == "FAILURE" :
292
- logging .error (f"SPDX Scan Failure: { item ['statusMessage' ]} " )
293
- sys .exit (1 )
294
- else :
295
- # Only other state should be "STARTED" -- keep polling
296
- print (f"Waiting for status success, currently: { item ['scanState' ]} " )
297
- time .sleep (sleep_time )
298
- # Break out of for loop so we always check the most recent
299
- break
262
+ # Wait for scanState = SUCCESS
263
+ while (retries ):
264
+ json_data = bd .get_json (latest_url )
265
+ retries -= 1
266
+ if json_data ['scanState' ] == "SUCCESS" :
267
+ print ("BOM upload complete" )
268
+ break
269
+ elif json_data ['scanState' ] == "FAILURE" :
270
+ logging .error (f"SPDX Scan Failure: { json_data ['statusMessage' ]} " )
271
+ sys .exit (1 )
272
+ else :
273
+ # Only other state should be "STARTED" -- keep polling
274
+ print (f"Waiting for status success, currently: { json_data ['scanState' ]} " )
275
+ time .sleep (sleep_time )
276
+
277
+ # If there were ZERO matches, there will never be a notification of
278
+ # BOM import success. Short-circuit that check and treat this as success.
279
+ if json_data ['matchCount' ] == 0 :
280
+ print ("No KB matches in BOM, continuing..." )
281
+ return
282
+
283
+ # Save the codelocation summaries_url
284
+ summaries_url = json_data ['_meta' ]['href' ]
285
+
286
+ # Greedy match - extract the scan id out of the URL
287
+ #scanid = re.findall(r'.*\/(.*)', json_data['_meta']['href'])
288
+ # proj_Version_url/bom-status/scanid does NOT WORK
289
+
290
+ # TODO this seems actually fairly pointless - it get stuck in UP_TO_DATE
291
+ retries = MAX_RETRIES
292
+ while (retries ):
293
+ json_data = bd .get_json (proj_version_url + "/bom-status" )
294
+ retries -= 1
295
+ if json_data ['status' ] == "UP_TO_DATE" :
296
+ print ("BOM import complete" )
297
+ break
298
+ elif json_data ['status' ] == "FAILURE" :
299
+ logging .error (f"BOM Import failure: { json_data ['status' ]} " )
300
+ sys .exit (1 )
301
+ else :
302
+ print (f"Waiting for BOM import completion, current status: { json_data ['status' ]} " )
303
+ time .sleep (sleep_time )
300
304
301
- # Handle various errors that might happen
302
- if max_retries == 0 :
303
- logging .error ("Failed to verify successful SPDX Scan in {max_retries * sleep_time} seconds" )
304
- elif not matched_scan :
305
- logging .error (f"No scan found for SBOM: { sbom_name } " )
306
- else :
307
- logging .error (f"Unable to verify successful scan of SBOM: { sbom_name } " )
305
+ if retries == 0 :
306
+ logging .error ("Failed to verify successful SBOM import in {retries * sleep_time} seconds" )
307
+ sys .exit (1 )
308
308
309
- # If we got this far, it's a fatal error.
310
- sys .exit (1 )
309
+ # Finally check notifications
310
+ poll_notifications_for_success (cl , proj_version_url , summaries_url )
311
+
312
+ # Any errors above already resulted in fatal exit
313
+ return
311
314
312
315
# Upload provided SBOM file to Black Duck
313
316
# Inputs:
@@ -328,7 +331,7 @@ def upload_sbom_file(filename, project, version):
328
331
logging .error (f"File { filename } is already mapped to a different project version" )
329
332
330
333
if response .status_code != 201 :
331
- logging .error (f"Failed to upload SPDX file: " )
334
+ logging .error (f"Failed to upload SPDX file" )
332
335
try :
333
336
pprint (response .json ()['errorMessage' ])
334
337
except :
@@ -560,9 +563,7 @@ def main():
560
563
upload_sbom_file (args .spdx_file , args .project_name , args .version_name )
561
564
562
565
# Wait for scan completion. Will exit if it fails.
563
- poll_for_upload (document .creation_info .name )
564
- # Also exits on failure. This may be somewhat redundant.
565
- poll_for_sbom_scan (document .creation_info .name , version )
566
+ poll_for_sbom_complete (document .creation_info .name , proj_version_url )
566
567
567
568
# Open unmatched component file to save name, spdxid, version, and
568
569
# origin/purl for later in json format
@@ -630,7 +631,7 @@ def main():
630
631
if (kb_match ):
631
632
# Update package name and version to reflect the KB name/ver
632
633
print (f" KB match for { package .name } { package .version } " )
633
- kb_matches += 1
634
+ kb_matches += 1
634
635
matchname = kb_match ['componentName' ]
635
636
matchver = kb_match ['versionName' ]
636
637
else :
@@ -667,6 +668,8 @@ def main():
667
668
if kb_match :
668
669
print (f" WARNING: { matchname } { matchver } in KB but not in SBOM" )
669
670
add_to_sbom (proj_version_url , kb_match ['version' ])
671
+ # TODO TEMP DEBUG TO CATCH THIS
672
+ quit ()
670
673
# short-circuit the rest
671
674
continue
672
675
0 commit comments