Skip to content

Commit c153e7f

Browse files
authored
Merge pull request #89 from ttngu207/no-curation
implement data compression using `mtscomp` for openephys and spikeglx for neuropixels data
2 parents ad8436e + d8aea04 commit c153e7f

File tree

5 files changed

+184
-6
lines changed

5 files changed

+184
-6
lines changed

element_array_ephys/ephys_acute.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import numpy as np
55
import inspect
66
import importlib
7+
import gc
78
from decimal import Decimal
89

910
from element_interface.utils import find_root_directory, find_full_path, dict_to_uuid
@@ -326,6 +327,10 @@ def make(self, key):
326327
self.EphysFile.insert([{**key,
327328
'file_path': fp.relative_to(root_dir).as_posix()}
328329
for fp in probe_data.recording_info['recording_files']])
330+
# explicitly garbage collect "dataset"
331+
# as these may have large memory footprint and may not be cleared fast enough
332+
del probe_data, dataset
333+
gc.collect()
329334
else:
330335
raise NotImplementedError(f'Processing ephys files from'
331336
f' acquisition software of type {acq_software} is'
@@ -919,7 +924,14 @@ def get_openephys_probe_data(ephys_recording_key):
919924
session_dir = find_full_path(get_ephys_root_data_dir(),
920925
get_session_directory(ephys_recording_key))
921926
loaded_oe = openephys.OpenEphys(session_dir)
922-
return loaded_oe.probes[inserted_probe_serial_number]
927+
probe_data = loaded_oe.probes[inserted_probe_serial_number]
928+
929+
# explicitly garbage collect "loaded_oe"
930+
# as these may have large memory footprint and may not be cleared fast enough
931+
del loaded_oe
932+
gc.collect()
933+
934+
return probe_data
923935

924936

925937
def get_neuropixels_channel2electrode_map(ephys_recording_key, acq_software):

element_array_ephys/ephys_chronic.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import numpy as np
55
import inspect
66
import importlib
7+
import gc
78
from decimal import Decimal
89

910
from element_interface.utils import find_root_directory, find_full_path, dict_to_uuid
@@ -273,6 +274,10 @@ def make(self, key):
273274
self.EphysFile.insert([{**key,
274275
'file_path': fp.relative_to(root_dir).as_posix()}
275276
for fp in probe_data.recording_info['recording_files']])
277+
# explicitly garbage collect "dataset"
278+
# as these may have large memory footprint and may not be cleared fast enough
279+
del probe_data, dataset
280+
gc.collect()
276281
else:
277282
raise NotImplementedError(f'Processing ephys files from'
278283
f' acquisition software of type {acq_software} is'
@@ -862,10 +867,17 @@ def get_spikeglx_meta_filepath(ephys_recording_key):
862867
def get_openephys_probe_data(ephys_recording_key):
863868
inserted_probe_serial_number = (ProbeInsertion * probe.Probe
864869
& ephys_recording_key).fetch1('probe')
865-
sess_dir = find_full_path(get_ephys_root_data_dir(),
870+
session_dir = find_full_path(get_ephys_root_data_dir(),
866871
get_session_directory(ephys_recording_key))
867-
loaded_oe = openephys.OpenEphys(sess_dir)
868-
return loaded_oe.probes[inserted_probe_serial_number]
872+
loaded_oe = openephys.OpenEphys(session_dir)
873+
probe_data = loaded_oe.probes[inserted_probe_serial_number]
874+
875+
# explicitly garbage collect "loaded_oe"
876+
# as these may have large memory footprint and may not be cleared fast enough
877+
del loaded_oe
878+
gc.collect()
879+
880+
return probe_data
869881

870882

871883
def get_neuropixels_channel2electrode_map(ephys_recording_key, acq_software):

element_array_ephys/ephys_no_curation.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import numpy as np
55
import inspect
66
import importlib
7+
import gc
78
from decimal import Decimal
89

910
from element_interface.utils import find_root_directory, find_full_path, dict_to_uuid
@@ -325,6 +326,10 @@ def make(self, key):
325326
self.EphysFile.insert([{**key,
326327
'file_path': fp.relative_to(root_dir).as_posix()}
327328
for fp in probe_data.recording_info['recording_files']])
329+
# explicitly garbage collect "dataset"
330+
# as these may have large memory footprint and may not be cleared fast enough
331+
del probe_data, dataset
332+
gc.collect()
328333
else:
329334
raise NotImplementedError(f'Processing ephys files from'
330335
f' acquisition software of type {acq_software} is'
@@ -877,7 +882,14 @@ def get_openephys_probe_data(ephys_recording_key):
877882
session_dir = find_full_path(get_ephys_root_data_dir(),
878883
get_session_directory(ephys_recording_key))
879884
loaded_oe = openephys.OpenEphys(session_dir)
880-
return loaded_oe.probes[inserted_probe_serial_number]
885+
probe_data = loaded_oe.probes[inserted_probe_serial_number]
886+
887+
# explicitly garbage collect "loaded_oe"
888+
# as these may have large memory footprint and may not be cleared fast enough
889+
del loaded_oe
890+
gc.collect()
891+
892+
return probe_data
881893

882894

883895
def get_neuropixels_channel2electrode_map(ephys_recording_key, acq_software):

element_array_ephys/readers/openephys.py

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
import numpy as np
44
import re
55
import datetime
6+
import logging
7+
8+
logger = logging.getLogger(__name__)
69

710

811
"""
@@ -155,6 +158,9 @@ def load_probe_data(self):
155158
float(rec.duration))
156159
probe.recording_info['recording_files'].append(
157160
rec.absolute_foldername / 'continuous' / continuous_info['folder_name'])
161+
elif continuous_type == 'lfp':
162+
probe.recording_info['recording_lfp_files'].append(
163+
rec.absolute_foldername / 'continuous' / continuous_info['folder_name'])
158164

159165
meta = getattr(probe, continuous_type + '_meta')
160166
if not meta:
@@ -229,7 +235,8 @@ def __init__(self, processor, probe_index=0):
229235
self.recording_info = {'recording_count': 0,
230236
'recording_datetimes': [],
231237
'recording_durations': [],
232-
'recording_files': []}
238+
'recording_files': [],
239+
'recording_lfp_files': []}
233240

234241
self._ap_timeseries = None
235242
self._ap_timestamps = None
@@ -303,3 +310,73 @@ def extract_spike_waveforms(self, spikes, channel_ind, n_wf=500, wf_win=(-32, 32
303310
return spike_wfs
304311
else: # if no spike found, return NaN of size (sample x channel x 1)
305312
return np.full((len(range(*wf_win)), len(channel_ind), 1), np.nan)
313+
314+
def compress(self):
315+
from mtscomp import compress as mts_compress
316+
317+
ap_dirs = self.recording_info['recording_files']
318+
lfp_dirs = self.recording_info['recording_lfp_files']
319+
320+
meta_mapping = {'ap': self.ap_meta, 'lfp': self.lfp_meta}
321+
322+
compressed_files = []
323+
for continuous_dir, continuous_type in zip(
324+
ap_dirs + lfp_dirs,
325+
['ap'] * len(ap_dirs) + ['lfp'] * len(lfp_dirs)):
326+
dat_fp = continuous_dir / 'continuous.dat'
327+
if not dat_fp.exists():
328+
raise FileNotFoundError(f'Compression error - "{dat_fp}" does not exist')
329+
cdat_fp = continuous_dir / 'continuous.cdat'
330+
ch_fp = continuous_dir / 'continuous.ch'
331+
332+
if cdat_fp.exists():
333+
assert ch_fp.exists()
334+
logger.info(f'Compressed file exists ({cdat_fp}), skipping...')
335+
continue
336+
337+
try:
338+
mts_compress(dat_fp, cdat_fp, ch_fp,
339+
sample_rate=meta_mapping[continuous_type]['sample_rate'],
340+
n_channels=meta_mapping[continuous_type]['num_channels'],
341+
dtype=np.memmap(dat_fp).dtype)
342+
except Exception as e:
343+
cdat_fp.unlink(missing_ok=True)
344+
ch_fp.unlink(missing_ok=True)
345+
raise e
346+
else:
347+
compressed_files.append((cdat_fp, ch_fp))
348+
349+
return compressed_files
350+
351+
def decompress(self):
352+
from mtscomp import decompress as mts_decompress
353+
354+
ap_dirs = self.recording_info['recording_files']
355+
lfp_dirs = self.recording_info['recording_lfp_files']
356+
357+
decompressed_files = []
358+
for continuous_dir, continuous_type in zip(
359+
ap_dirs + lfp_dirs,
360+
['ap'] * len(ap_dirs) + ['lfp'] * len(lfp_dirs)):
361+
dat_fp = continuous_dir / 'continuous.dat'
362+
363+
if dat_fp.exists():
364+
logger.info(f'Decompressed file exists ({dat_fp}), skipping...')
365+
continue
366+
367+
cdat_fp = continuous_dir / 'continuous.cdat'
368+
ch_fp = continuous_dir / 'continuous.ch'
369+
370+
if not cdat_fp.exists():
371+
raise FileNotFoundError(f'Decompression error - "{cdat_fp}" does not exist')
372+
373+
try:
374+
decomp_arr = mts_decompress(cdat_fp, ch_fp)
375+
decomp_arr.tofile(dat_fp)
376+
except Exception as e:
377+
dat_fp.unlink(missing_ok=True)
378+
raise e
379+
else:
380+
decompressed_files.append(dat_fp)
381+
382+
return decompressed_files

element_array_ephys/readers/spikeglx.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
from datetime import datetime
22
import numpy as np
33
import pathlib
4+
import logging
45
from .utils import convert_to_number
56

7+
logger = logging.getLogger(__name__)
68

79
AP_GAIN = 80 # For NP 2.0 probes; APGain = 80 for all AP (LF is computed from AP)
810

@@ -159,6 +161,69 @@ def validate_file(self, file_type='ap'):
159161
if file_size != meta.meta['fileSizeBytes']:
160162
raise IOError(f'File size error! {file_path} may be corrupted or in transfer?')
161163

164+
def compress(self):
165+
from mtscomp import compress as mts_compress
166+
167+
ap_file = self.root_dir / (self.root_name + '.ap.bin')
168+
lfp_file = self.root_dir / (self.root_name + '.lf.bin')
169+
170+
meta_mapping = {'ap': self.apmeta, 'lfp': self.lfmeta}
171+
172+
compressed_files = []
173+
for bin_fp, band_type in zip([ap_file, lfp_file], ['ap', 'lfp']):
174+
if not bin_fp.exists():
175+
raise FileNotFoundError(f'Compression error - "{bin_fp}" does not exist')
176+
cbin_fp = bin_fp.parent / f'{bin_fp.stem}.cbin'
177+
ch_fp = bin_fp.parent / f'{bin_fp.stem}.ch'
178+
179+
if cbin_fp.exists():
180+
assert ch_fp.exists()
181+
logger.info(f'Compressed file exists ({cbin_fp}), skipping...')
182+
continue
183+
184+
try:
185+
mts_compress(bin_fp, cbin_fp, ch_fp,
186+
sample_rate=meta_mapping[band_type]['sample_rate'],
187+
n_channels=meta_mapping[band_type]['num_channels'],
188+
dtype=np.memmap(bin_fp).dtype)
189+
except Exception as e:
190+
cbin_fp.unlink(missing_ok=True)
191+
ch_fp.unlink(missing_ok=True)
192+
raise e
193+
else:
194+
compressed_files.append((cbin_fp, ch_fp))
195+
196+
return compressed_files
197+
198+
def decompress(self):
199+
from mtscomp import decompress as mts_decompress
200+
201+
ap_file = self.root_dir / (self.root_name + '.ap.bin')
202+
lfp_file = self.root_dir / (self.root_name + '.lf.bin')
203+
204+
decompressed_files = []
205+
for bin_fp, band_type in zip([ap_file, lfp_file], ['ap', 'lfp']):
206+
if bin_fp.exists():
207+
logger.info(f'Decompressed file exists ({bin_fp}), skipping...')
208+
continue
209+
210+
cbin_fp = bin_fp.parent / f'{bin_fp.stem}.cbin'
211+
ch_fp = bin_fp.parent / f'{bin_fp.stem}.ch'
212+
213+
if not cbin_fp.exists():
214+
raise FileNotFoundError(f'Decompression error - "{cbin_fp}" does not exist')
215+
216+
try:
217+
decomp_arr = mts_decompress(cbin_fp, ch_fp)
218+
decomp_arr.tofile(bin_fp)
219+
except Exception as e:
220+
bin_fp.unlink(missing_ok=True)
221+
raise e
222+
else:
223+
decompressed_files.append(bin_fp)
224+
225+
return decompressed_files
226+
162227

163228
class SpikeGLXMeta:
164229

0 commit comments

Comments
 (0)