7
7
import os
8
8
import scipy .io
9
9
import numpy as np
10
- from datetime import datetime
10
+ from datetime import datetime , timedelta
11
11
12
12
from element_interface .utils import dict_to_uuid
13
13
@@ -191,8 +191,7 @@ def run_modules(self):
191
191
if module_status ['completion_time' ] is not None :
192
192
continue
193
193
194
- module_output_json = module_input_json .replace ('-input.json' ,
195
- '-' + module + '-output.json' )
194
+ module_output_json = self ._get_module_output_json_filename (module )
196
195
command = (sys .executable
197
196
+ " -W ignore -m ecephys_spike_sorting.modules." + module
198
197
+ " --input_json " + module_input_json
@@ -233,11 +232,19 @@ def _update_module_status(self, updated_module_status={}):
233
232
with open (self ._modules_input_hash_fp ) as f :
234
233
modules_status = json .load (f )
235
234
modules_status = {** modules_status , ** updated_module_status }
235
+ modules_status ['cumulative_execution_duration' ] = sum (
236
+ v ['duration' ] or 0 for k , v in modules_status .items ()
237
+ if k not in ('cumulative_execution_duration' , 'total_duration' ))
238
+ modules_status ['total_duration' ] = (
239
+ modules_status [self ._modules [- 1 ]]['completion_time' ]
240
+ - modules_status [self ._modules [0 ]]['start_time' ]).total_seconds ()
236
241
else :
237
242
modules_status = {module : {'start_time' : None ,
238
243
'completion_time' : None ,
239
244
'duration' : None }
240
245
for module in self ._modules }
246
+ modules_status ['cumulative_execution_duration' ] = 0
247
+ modules_status ['total_duration' ] = 0
241
248
with open (self ._modules_input_hash_fp , 'w' ) as f :
242
249
json .dump (modules_status , f , default = str )
243
250
@@ -248,10 +255,30 @@ def _get_module_status(self, module):
248
255
if self ._modules_input_hash_fp .exists ():
249
256
with open (self ._modules_input_hash_fp ) as f :
250
257
modules_status = json .load (f )
258
+ if modules_status [module ]['completion_time' ] is None :
259
+ # additional logic to read from the "-output.json" file for this module as well
260
+ # handle cases where the module has finished successfully,
261
+ # but the "_modules_input_hash_fp" is not updated (for whatever reason),
262
+ # resulting in this module not registered as completed in the "_modules_input_hash_fp"
263
+ modules_module_output_json_fp = pathlib .Path (self ._get_module_output_json_filename (module ))
264
+ if modules_module_output_json_fp .exists ():
265
+ with open (modules_module_output_json_fp ) as f :
266
+ module_run_output = json .load (f )
267
+ modules_status [module ]['duration' ] = module_run_output ['execution_time' ]
268
+ modules_status [module ]['completion_time' ] = (
269
+ modules_status [module ]['start_time' ]
270
+ + timedelta (seconds = module_run_output ['execution_time' ]))
251
271
return modules_status [module ]
252
272
253
273
return {'start_time' : None , 'completion_time' : None , 'duration' : None }
254
274
275
+ def _get_module_output_json_filename (self , module ):
276
+ module_input_json = self ._module_input_json .as_posix ()
277
+ module_output_json = module_input_json .replace (
278
+ '-input.json' ,
279
+ '-' + module + '-' + self ._modules_input_hash + '-output.json' )
280
+ return module_output_json
281
+
255
282
256
283
class OpenEphysKilosortPipeline :
257
284
"""
0 commit comments