@@ -198,6 +198,10 @@ def run_modules(self):
198
198
+ " --output_json " + module_output_json )
199
199
200
200
start_time = datetime .utcnow ()
201
+ self ._update_module_status (
202
+ {module : {'start_time' : start_time ,
203
+ 'completion_time' : None ,
204
+ 'duration' : None }})
201
205
with open (module_logfile , "a" ) as f :
202
206
subprocess .check_call (command .split (' ' ), stdout = f )
203
207
completion_time = datetime .utcnow ()
@@ -206,6 +210,8 @@ def run_modules(self):
206
210
'completion_time' : completion_time ,
207
211
'duration' : (completion_time - start_time ).total_seconds ()}})
208
212
213
+ self ._update_total_duration ()
214
+
209
215
def _get_raw_data_filepaths (self ):
210
216
session_str , gate_str , _ , probe_str = self .parse_input_filename ()
211
217
@@ -232,19 +238,11 @@ def _update_module_status(self, updated_module_status={}):
232
238
with open (self ._modules_input_hash_fp ) as f :
233
239
modules_status = json .load (f )
234
240
modules_status = {** modules_status , ** updated_module_status }
235
- modules_status ['cumulative_execution_duration' ] = sum (
236
- v ['duration' ] or 0 for k , v in modules_status .items ()
237
- if k not in ('cumulative_execution_duration' , 'total_duration' ))
238
- modules_status ['total_duration' ] = (
239
- modules_status [self ._modules [- 1 ]]['completion_time' ]
240
- - modules_status [self ._modules [0 ]]['start_time' ]).total_seconds ()
241
241
else :
242
242
modules_status = {module : {'start_time' : None ,
243
243
'completion_time' : None ,
244
244
'duration' : None }
245
245
for module in self ._modules }
246
- modules_status ['cumulative_execution_duration' ] = 0
247
- modules_status ['total_duration' ] = 0
248
246
with open (self ._modules_input_hash_fp , 'w' ) as f :
249
247
json .dump (modules_status , f , default = str )
250
248
@@ -260,13 +258,13 @@ def _get_module_status(self, module):
260
258
# handle cases where the module has finished successfully,
261
259
# but the "_modules_input_hash_fp" is not updated (for whatever reason),
262
260
# resulting in this module not registered as completed in the "_modules_input_hash_fp"
263
- modules_module_output_json_fp = pathlib .Path (self ._get_module_output_json_filename (module ))
264
- if modules_module_output_json_fp .exists ():
265
- with open (modules_module_output_json_fp ) as f :
261
+ module_output_json_fp = pathlib .Path (self ._get_module_output_json_filename (module ))
262
+ if module_output_json_fp .exists ():
263
+ with open (module_output_json_fp ) as f :
266
264
module_run_output = json .load (f )
267
265
modules_status [module ]['duration' ] = module_run_output ['execution_time' ]
268
266
modules_status [module ]['completion_time' ] = (
269
- modules_status [module ]['start_time' ]
267
+ datetime . strptime ( modules_status [module ]['start_time' ], '%Y-%m-%d %H:%M:%S.%f' )
270
268
+ timedelta (seconds = module_run_output ['execution_time' ]))
271
269
return modules_status [module ]
272
270
@@ -276,9 +274,23 @@ def _get_module_output_json_filename(self, module):
276
274
module_input_json = self ._module_input_json .as_posix ()
277
275
module_output_json = module_input_json .replace (
278
276
'-input.json' ,
279
- '-' + module + '-' + self ._modules_input_hash + '-output.json' )
277
+ '-' + module + '-' + str ( self ._modules_input_hash ) + '-output.json' )
280
278
return module_output_json
281
279
280
+ def _update_total_duration (self ):
281
+ with open (self ._modules_input_hash_fp ) as f :
282
+ modules_status = json .load (f )
283
+ cumulative_execution_duration = sum (
284
+ v ['duration' ] or 0 for k , v in modules_status .items ()
285
+ if k not in ('cumulative_execution_duration' , 'total_duration' ))
286
+ total_duration = (
287
+ datetime .strptime (modules_status [self ._modules [- 1 ]]['completion_time' ], '%Y-%m-%d %H:%M:%S.%f' )
288
+ - datetime .strptime (modules_status [self ._modules [0 ]]['start_time' ], '%Y-%m-%d %H:%M:%S.%f' )
289
+ ).total_seconds ()
290
+ self ._update_module_status (
291
+ {'cumulative_execution_duration' : cumulative_execution_duration ,
292
+ 'total_duration' : total_duration })
293
+
282
294
283
295
class OpenEphysKilosortPipeline :
284
296
"""
@@ -388,6 +400,10 @@ def run_modules(self):
388
400
+ " --output_json " + module_output_json )
389
401
390
402
start_time = datetime .utcnow ()
403
+ self ._update_module_status (
404
+ {module : {'start_time' : start_time ,
405
+ 'completion_time' : None ,
406
+ 'duration' : None }})
391
407
with open (module_logfile , "a" ) as f :
392
408
subprocess .check_call (command .split (' ' ), stdout = f )
393
409
completion_time = datetime .utcnow ()
@@ -396,6 +412,8 @@ def run_modules(self):
396
412
'completion_time' : completion_time ,
397
413
'duration' : (completion_time - start_time ).total_seconds ()}})
398
414
415
+ self ._update_total_duration ()
416
+
399
417
def _update_module_status (self , updated_module_status = {}):
400
418
if self ._modules_input_hash is None :
401
419
raise RuntimeError ('"generate_modules_input_json()" not yet performed!' )
@@ -420,10 +438,44 @@ def _get_module_status(self, module):
420
438
if self ._modules_input_hash_fp .exists ():
421
439
with open (self ._modules_input_hash_fp ) as f :
422
440
modules_status = json .load (f )
441
+ if modules_status [module ]['completion_time' ] is None :
442
+ # additional logic to read from the "-output.json" file for this module as well
443
+ # handle cases where the module has finished successfully,
444
+ # but the "_modules_input_hash_fp" is not updated (for whatever reason),
445
+ # resulting in this module not registered as completed in the "_modules_input_hash_fp"
446
+ module_output_json_fp = pathlib .Path (self ._get_module_output_json_filename (module ))
447
+ if module_output_json_fp .exists ():
448
+ with open (module_output_json_fp ) as f :
449
+ module_run_output = json .load (f )
450
+ modules_status [module ]['duration' ] = module_run_output ['execution_time' ]
451
+ modules_status [module ]['completion_time' ] = (
452
+ datetime .strptime (modules_status [module ]['start_time' ], '%Y-%m-%d %H:%M:%S.%f' )
453
+ + timedelta (seconds = module_run_output ['execution_time' ]))
423
454
return modules_status [module ]
424
455
425
456
return {'start_time' : None , 'completion_time' : None , 'duration' : None }
426
457
458
+ def _get_module_output_json_filename (self , module ):
459
+ module_input_json = self ._module_input_json .as_posix ()
460
+ module_output_json = module_input_json .replace (
461
+ '-input.json' ,
462
+ '-' + module + '-' + str (self ._modules_input_hash ) + '-output.json' )
463
+ return module_output_json
464
+
465
+ def _update_total_duration (self ):
466
+ with open (self ._modules_input_hash_fp ) as f :
467
+ modules_status = json .load (f )
468
+ cumulative_execution_duration = sum (
469
+ v ['duration' ] or 0 for k , v in modules_status .items ()
470
+ if k not in ('cumulative_execution_duration' , 'total_duration' ))
471
+ total_duration = (
472
+ datetime .strptime (modules_status [self ._modules [- 1 ]]['completion_time' ], '%Y-%m-%d %H:%M:%S.%f' )
473
+ - datetime .strptime (modules_status [self ._modules [0 ]]['start_time' ], '%Y-%m-%d %H:%M:%S.%f' )
474
+ ).total_seconds ()
475
+ self ._update_module_status (
476
+ {'cumulative_execution_duration' : cumulative_execution_duration ,
477
+ 'total_duration' : total_duration })
478
+
427
479
428
480
def run_pykilosort (continuous_file , kilosort_output_directory , params ,
429
481
channel_ind , x_coords , y_coords , shank_ind , connected , sample_rate ):
0 commit comments