7
7
import os
8
8
import scipy .io
9
9
import numpy as np
10
- from datetime import datetime
10
+ from datetime import datetime , timedelta
11
11
12
12
from element_interface .utils import dict_to_uuid
13
13
@@ -191,14 +191,17 @@ def run_modules(self):
191
191
if module_status ['completion_time' ] is not None :
192
192
continue
193
193
194
- module_output_json = module_input_json .replace ('-input.json' ,
195
- '-' + module + '-output.json' )
194
+ module_output_json = self ._get_module_output_json_filename (module )
196
195
command = (sys .executable
197
196
+ " -W ignore -m ecephys_spike_sorting.modules." + module
198
197
+ " --input_json " + module_input_json
199
198
+ " --output_json " + module_output_json )
200
199
201
200
start_time = datetime .utcnow ()
201
+ self ._update_module_status (
202
+ {module : {'start_time' : start_time ,
203
+ 'completion_time' : None ,
204
+ 'duration' : None }})
202
205
with open (module_logfile , "a" ) as f :
203
206
subprocess .check_call (command .split (' ' ), stdout = f )
204
207
completion_time = datetime .utcnow ()
@@ -207,6 +210,8 @@ def run_modules(self):
207
210
'completion_time' : completion_time ,
208
211
'duration' : (completion_time - start_time ).total_seconds ()}})
209
212
213
+ self ._update_total_duration ()
214
+
210
215
def _get_raw_data_filepaths (self ):
211
216
session_str , gate_str , _ , probe_str = self .parse_input_filename ()
212
217
@@ -248,10 +253,44 @@ def _get_module_status(self, module):
248
253
if self ._modules_input_hash_fp .exists ():
249
254
with open (self ._modules_input_hash_fp ) as f :
250
255
modules_status = json .load (f )
256
+ if modules_status [module ]['completion_time' ] is None :
257
+ # additional logic to read from the "-output.json" file for this module as well
258
+ # handle cases where the module has finished successfully,
259
+ # but the "_modules_input_hash_fp" is not updated (for whatever reason),
260
+ # resulting in this module not registered as completed in the "_modules_input_hash_fp"
261
+ module_output_json_fp = pathlib .Path (self ._get_module_output_json_filename (module ))
262
+ if module_output_json_fp .exists ():
263
+ with open (module_output_json_fp ) as f :
264
+ module_run_output = json .load (f )
265
+ modules_status [module ]['duration' ] = module_run_output ['execution_time' ]
266
+ modules_status [module ]['completion_time' ] = (
267
+ datetime .strptime (modules_status [module ]['start_time' ], '%Y-%m-%d %H:%M:%S.%f' )
268
+ + timedelta (seconds = module_run_output ['execution_time' ]))
251
269
return modules_status [module ]
252
270
253
271
return {'start_time' : None , 'completion_time' : None , 'duration' : None }
254
272
273
+ def _get_module_output_json_filename (self , module ):
274
+ module_input_json = self ._module_input_json .as_posix ()
275
+ module_output_json = module_input_json .replace (
276
+ '-input.json' ,
277
+ '-' + module + '-' + str (self ._modules_input_hash ) + '-output.json' )
278
+ return module_output_json
279
+
280
+ def _update_total_duration (self ):
281
+ with open (self ._modules_input_hash_fp ) as f :
282
+ modules_status = json .load (f )
283
+ cumulative_execution_duration = sum (
284
+ v ['duration' ] or 0 for k , v in modules_status .items ()
285
+ if k not in ('cumulative_execution_duration' , 'total_duration' ))
286
+ total_duration = (
287
+ datetime .strptime (modules_status [self ._modules [- 1 ]]['completion_time' ], '%Y-%m-%d %H:%M:%S.%f' )
288
+ - datetime .strptime (modules_status [self ._modules [0 ]]['start_time' ], '%Y-%m-%d %H:%M:%S.%f' )
289
+ ).total_seconds ()
290
+ self ._update_module_status (
291
+ {'cumulative_execution_duration' : cumulative_execution_duration ,
292
+ 'total_duration' : total_duration })
293
+
255
294
256
295
class OpenEphysKilosortPipeline :
257
296
"""
@@ -353,22 +392,27 @@ def run_modules(self):
353
392
if module_status ['completion_time' ] is not None :
354
393
continue
355
394
356
- module_output_json = module_input_json .replace ('-input.json' ,
357
- '-' + module + '-output.json' )
358
- command = (sys .executable
359
- + " -W ignore -m ecephys_spike_sorting.modules." + module
360
- + " --input_json " + module_input_json
361
- + " --output_json " + module_output_json )
395
+ module_output_json = self ._get_module_output_json_filename (module )
396
+ command = [sys .executable ,
397
+ '-W' , 'ignore' , '-m' , 'ecephys_spike_sorting.modules.' + module ,
398
+ '--input_json' , module_input_json ,
399
+ '--output_json' , module_output_json ]
362
400
363
401
start_time = datetime .utcnow ()
402
+ self ._update_module_status (
403
+ {module : {'start_time' : start_time ,
404
+ 'completion_time' : None ,
405
+ 'duration' : None }})
364
406
with open (module_logfile , "a" ) as f :
365
- subprocess .check_call (command . split ( ' ' ) , stdout = f )
407
+ subprocess .check_call (command , stdout = f )
366
408
completion_time = datetime .utcnow ()
367
409
self ._update_module_status (
368
410
{module : {'start_time' : start_time ,
369
411
'completion_time' : completion_time ,
370
412
'duration' : (completion_time - start_time ).total_seconds ()}})
371
413
414
+ self ._update_total_duration ()
415
+
372
416
def _update_module_status (self , updated_module_status = {}):
373
417
if self ._modules_input_hash is None :
374
418
raise RuntimeError ('"generate_modules_input_json()" not yet performed!' )
@@ -393,10 +437,44 @@ def _get_module_status(self, module):
393
437
if self ._modules_input_hash_fp .exists ():
394
438
with open (self ._modules_input_hash_fp ) as f :
395
439
modules_status = json .load (f )
440
+ if modules_status [module ]['completion_time' ] is None :
441
+ # additional logic to read from the "-output.json" file for this module as well
442
+ # handle cases where the module has finished successfully,
443
+ # but the "_modules_input_hash_fp" is not updated (for whatever reason),
444
+ # resulting in this module not registered as completed in the "_modules_input_hash_fp"
445
+ module_output_json_fp = pathlib .Path (self ._get_module_output_json_filename (module ))
446
+ if module_output_json_fp .exists ():
447
+ with open (module_output_json_fp ) as f :
448
+ module_run_output = json .load (f )
449
+ modules_status [module ]['duration' ] = module_run_output ['execution_time' ]
450
+ modules_status [module ]['completion_time' ] = (
451
+ datetime .strptime (modules_status [module ]['start_time' ], '%Y-%m-%d %H:%M:%S.%f' )
452
+ + timedelta (seconds = module_run_output ['execution_time' ]))
396
453
return modules_status [module ]
397
454
398
455
return {'start_time' : None , 'completion_time' : None , 'duration' : None }
399
456
457
+ def _get_module_output_json_filename (self , module ):
458
+ module_input_json = self ._module_input_json .as_posix ()
459
+ module_output_json = module_input_json .replace (
460
+ '-input.json' ,
461
+ '-' + module + '-' + str (self ._modules_input_hash ) + '-output.json' )
462
+ return module_output_json
463
+
464
+ def _update_total_duration (self ):
465
+ with open (self ._modules_input_hash_fp ) as f :
466
+ modules_status = json .load (f )
467
+ cumulative_execution_duration = sum (
468
+ v ['duration' ] or 0 for k , v in modules_status .items ()
469
+ if k not in ('cumulative_execution_duration' , 'total_duration' ))
470
+ total_duration = (
471
+ datetime .strptime (modules_status [self ._modules [- 1 ]]['completion_time' ], '%Y-%m-%d %H:%M:%S.%f' )
472
+ - datetime .strptime (modules_status [self ._modules [0 ]]['start_time' ], '%Y-%m-%d %H:%M:%S.%f' )
473
+ ).total_seconds ()
474
+ self ._update_module_status (
475
+ {'cumulative_execution_duration' : cumulative_execution_duration ,
476
+ 'total_duration' : total_duration })
477
+
400
478
401
479
def run_pykilosort (continuous_file , kilosort_output_directory , params ,
402
480
channel_ind , x_coords , y_coords , shank_ind , connected , sample_rate ):
0 commit comments