|
| 1 | +from collections import OrderedDict |
| 2 | +from pathlib import Path |
| 3 | +import numpy as np |
| 4 | + |
| 5 | +from ...utils import logger, fill_doc, verbose |
| 6 | +from ..meas_info import create_info |
| 7 | +from ...evoked import EvokedArray |
| 8 | + |
| 9 | + |
| 10 | +@fill_doc |
| 11 | +@verbose |
| 12 | +def read_evoked_besa(fname, verbose=None): |
| 13 | + """Reader function for BESA .avr or .mul files. |
| 14 | +
|
| 15 | + When a .elp sidecar file is present, it will be used to determine electrode |
| 16 | + information. |
| 17 | +
|
| 18 | + Parameters |
| 19 | + ---------- |
| 20 | + fname : str | Path |
| 21 | + Path to the .avr or .mul file. |
| 22 | + %(verbose)s |
| 23 | +
|
| 24 | + Returns |
| 25 | + ------- |
| 26 | + ev : Evoked |
| 27 | + The evoked data in the .avr or .mul file. |
| 28 | + """ |
| 29 | + fname = Path(fname) |
| 30 | + if fname.suffix == '.avr': |
| 31 | + return _read_evoked_besa_avr(fname, verbose) |
| 32 | + elif fname.suffix == '.mul': |
| 33 | + return _read_evoked_besa_mul(fname, verbose) |
| 34 | + else: |
| 35 | + raise ValueError('Filename must end in either .avr or .mul') |
| 36 | + |
| 37 | + |
| 38 | +@verbose |
| 39 | +def _read_evoked_besa_avr(fname, verbose): |
| 40 | + """Create EvokedArray from a BESA .avr file.""" |
| 41 | + with open(fname) as f: |
| 42 | + header = f.readline().strip() |
| 43 | + |
| 44 | + # There are two versions of .avr files. The old style, generated by |
| 45 | + # BESA 1, 2 and 3 does not define Nchan and does not have channel names |
| 46 | + # in the file. |
| 47 | + new_style = 'Nchan=' in header |
| 48 | + if new_style: |
| 49 | + ch_names = f.readline().strip().split() |
| 50 | + else: |
| 51 | + ch_names = None |
| 52 | + |
| 53 | + fields = _parse_header(header) |
| 54 | + data = np.loadtxt(fname, skiprows=2 if new_style else 1, ndmin=2) |
| 55 | + ch_types = _read_elp_sidecar(fname) |
| 56 | + |
| 57 | + # Consolidate channel names |
| 58 | + if new_style: |
| 59 | + if len(ch_names) != len(data): |
| 60 | + raise RuntimeError( |
| 61 | + 'Mismatch between the number of channel names defined in ' |
| 62 | + f'the .avr file ({len(ch_names)}) and the number of rows ' |
| 63 | + f'in the data matrix ({len(data)}).') |
| 64 | + else: |
| 65 | + # Determine channel names from the .elp sidecar file |
| 66 | + if ch_types is not None: |
| 67 | + ch_names = list(ch_types.keys()) |
| 68 | + if len(ch_names) != len(data): |
| 69 | + raise RuntimeError('Mismatch between the number of channels ' |
| 70 | + f'defined in the .avr file ({len(data)}) ' |
| 71 | + f'and .elp file ({len(ch_names)}).') |
| 72 | + else: |
| 73 | + logger.info('No .elp file found and no channel names present in ' |
| 74 | + 'the .avr file. Falling back to generic names. ') |
| 75 | + ch_names = [f'CH{i + 1:02d}' for i in range(len(data))] |
| 76 | + |
| 77 | + # Consolidate channel types |
| 78 | + if ch_types is None: |
| 79 | + logger.info('Marking all channels as EEG.') |
| 80 | + ch_types = ['eeg'] * len(ch_names) |
| 81 | + else: |
| 82 | + ch_types = [ch_types[ch] for ch in ch_names] |
| 83 | + |
| 84 | + # Go over all the header fields and make sure they are all defined to |
| 85 | + # something sensible. |
| 86 | + if 'Npts' in fields: |
| 87 | + fields['Npts'] = int(fields['Npts']) |
| 88 | + if fields['Npts'] != data.shape[1]: |
| 89 | + logger.warn(f'The size of the data matrix ({data.shape}) does not ' |
| 90 | + f'match the "Npts" field ({fields["Npts"]}).') |
| 91 | + if 'Nchan' in fields: |
| 92 | + fields['Nchan'] = int(fields['Nchan']) |
| 93 | + if fields['Nchan'] != data.shape[0]: |
| 94 | + logger.warn(f'The size of the data matrix ({data.shape}) does not ' |
| 95 | + f'match the "Nchan" field ({fields["Nchan"]}).') |
| 96 | + if 'DI' in fields: |
| 97 | + fields['DI'] = float(fields['DI']) |
| 98 | + else: |
| 99 | + raise RuntimeError('No "DI" field present. Could not determine ' |
| 100 | + 'sampling frequency.') |
| 101 | + if 'TSB' in fields: |
| 102 | + fields['TSB'] = float(fields['TSB']) |
| 103 | + else: |
| 104 | + fields['TSB'] = 0 |
| 105 | + if 'SB' in fields: |
| 106 | + fields['SB'] = float(fields['SB']) |
| 107 | + else: |
| 108 | + fields['SB'] = 1.0 |
| 109 | + if 'SegmentName' not in fields: |
| 110 | + fields['SegmentName'] = '' |
| 111 | + |
| 112 | + # Build the Evoked object based on the header fields. |
| 113 | + info = create_info(ch_names, sfreq=1000 / fields['DI'], ch_types='eeg') |
| 114 | + return EvokedArray(data / fields['SB'] / 1E6, info, |
| 115 | + tmin=fields['TSB'] / 1000, |
| 116 | + comment=fields['SegmentName'], verbose=verbose) |
| 117 | + |
| 118 | + |
| 119 | +@verbose |
| 120 | +def _read_evoked_besa_mul(fname, verbose): |
| 121 | + """Create EvokedArray from a BESA .mul file.""" |
| 122 | + with open(fname) as f: |
| 123 | + header = f.readline().strip() |
| 124 | + ch_names = f.readline().strip().split() |
| 125 | + |
| 126 | + fields = _parse_header(header) |
| 127 | + data = np.loadtxt(fname, skiprows=2, ndmin=2) |
| 128 | + |
| 129 | + if len(ch_names) != data.shape[1]: |
| 130 | + raise RuntimeError('Mismatch between the number of channel names ' |
| 131 | + f'defined in the .mul file ({len(ch_names)}) ' |
| 132 | + 'and the number of columns in the data matrix ' |
| 133 | + f'({data.shape[1]}).') |
| 134 | + |
| 135 | + # Consolidate channel types |
| 136 | + ch_types = _read_elp_sidecar(fname) |
| 137 | + if ch_types is None: |
| 138 | + logger.info('Marking all channels as EEG.') |
| 139 | + ch_types = ['eeg'] * len(ch_names) |
| 140 | + else: |
| 141 | + ch_types = [ch_types[ch] for ch in ch_names] |
| 142 | + |
| 143 | + # Go over all the header fields and make sure they are all defined to |
| 144 | + # something sensible. |
| 145 | + if 'TimePoints' in fields: |
| 146 | + fields['TimePoints'] = int(fields['TimePoints']) |
| 147 | + if fields['TimePoints'] != data.shape[0]: |
| 148 | + logger.warn( |
| 149 | + f'The size of the data matrix ({data.shape}) does not ' |
| 150 | + f'match the "TimePoints" field ({fields["TimePoints"]}).') |
| 151 | + if 'Channels' in fields: |
| 152 | + fields['Channels'] = int(fields['Channels']) |
| 153 | + if fields['Channels'] != data.shape[1]: |
| 154 | + logger.warn(f'The size of the data matrix ({data.shape}) does not ' |
| 155 | + f'match the "Channels" field ({fields["Channels"]}).') |
| 156 | + if 'SamplingInterval[ms]' in fields: |
| 157 | + fields['SamplingInterval[ms]'] = float(fields['SamplingInterval[ms]']) |
| 158 | + else: |
| 159 | + raise RuntimeError('No "SamplingInterval[ms]" field present. Could ' |
| 160 | + 'not determine sampling frequency.') |
| 161 | + if 'BeginSweep[ms]' in fields: |
| 162 | + fields['BeginSweep[ms]'] = float(fields['BeginSweep[ms]']) |
| 163 | + else: |
| 164 | + fields['BeginSweep[ms]'] = 0.0 |
| 165 | + if 'Bins/uV' in fields: |
| 166 | + fields['Bins/uV'] = float(fields['Bins/uV']) |
| 167 | + else: |
| 168 | + fields['Bins/uV'] = 1 |
| 169 | + if 'SegmentName' not in fields: |
| 170 | + fields['SegmentName'] = '' |
| 171 | + |
| 172 | + # Build the Evoked object based on the header fields. |
| 173 | + info = create_info(ch_names, sfreq=1000 / fields['SamplingInterval[ms]'], |
| 174 | + ch_types=ch_types) |
| 175 | + return EvokedArray(data.T / fields['Bins/uV'] / 1E6, info, |
| 176 | + tmin=fields['BeginSweep[ms]'] / 1000, |
| 177 | + comment=fields['SegmentName'], verbose=verbose) |
| 178 | + |
| 179 | + |
| 180 | +def _parse_header(header): |
| 181 | + """Parse an .avr or .mul header string into name/val pairs. |
| 182 | +
|
| 183 | + The header line looks like: |
| 184 | + Npts= 256 TSB= 0.000 DI= 4.000000 SB= 1.000 SC= 200.0 Nchan= 27 |
| 185 | + No consistent use of separation chars, so parsing this is a bit iffy. |
| 186 | +
|
| 187 | + Parameters |
| 188 | + ---------- |
| 189 | + header : str |
| 190 | + The first line of the file. |
| 191 | +
|
| 192 | + Returns |
| 193 | + ------- |
| 194 | + fields : dict |
| 195 | + The parsed header fields |
| 196 | + """ |
| 197 | + parts = header.split() # Splits on one or more spaces |
| 198 | + name_val_pairs = zip(parts[::2], parts[1::2]) |
| 199 | + return dict((name.replace('=', ''), val) for name, val in name_val_pairs) |
| 200 | + |
| 201 | + |
| 202 | +def _read_elp_sidecar(fname): |
| 203 | + """Read a possible .elp sidecar file with electrode information. |
| 204 | +
|
| 205 | + The reason we don't use the read_custom_montage for this is that we are |
| 206 | + interested in the channel types, which a DigMontage object does not provide |
| 207 | + us. |
| 208 | +
|
| 209 | + Parameters |
| 210 | + ---------- |
| 211 | + fname : Path |
| 212 | + The path of the .avr or .mul file. The corresponding .elp file will be |
| 213 | + derived from this path. |
| 214 | +
|
| 215 | + Returns |
| 216 | + ------- |
| 217 | + ch_type : OrderedDict | None |
| 218 | + If the sidecar file exists, return a dictionary mapping channel names |
| 219 | + to channel types. Otherwise returns ``None``. |
| 220 | + """ |
| 221 | + fname_elp = fname.parent / (fname.stem + '.elp') |
| 222 | + if not fname_elp.exists(): |
| 223 | + logger.info(f'No {fname_elp} file present containing electrode ' |
| 224 | + 'information.') |
| 225 | + return None |
| 226 | + |
| 227 | + logger.info(f'Reading electrode names and types from {fname_elp}') |
| 228 | + ch_types = OrderedDict() |
| 229 | + with open(fname_elp) as f: |
| 230 | + lines = f.readlines() |
| 231 | + if len(lines[0].split()) > 3: |
| 232 | + # Channel types present |
| 233 | + for line in lines: |
| 234 | + ch_type, ch_name = line.split()[:2] |
| 235 | + ch_types[ch_name] = ch_type.lower() |
| 236 | + else: |
| 237 | + # No channel types present |
| 238 | + logger.info('No channel types present in .elp file. Marking all ' |
| 239 | + 'channels as EEG.') |
| 240 | + for line in lines: |
| 241 | + ch_name = line.split()[:1] |
| 242 | + ch_types[ch_name] = 'eeg' |
| 243 | + return ch_types |
0 commit comments