diff --git a/azure-pipelines.yml b/azure-pipelines.yml index c6a36af99d0..f632cc8fe7d 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -113,7 +113,7 @@ stages: - bash: | set -e python -m pip install --progress-bar off --upgrade pip - python -m pip install --progress-bar off "mne-qt-browser[opengl] @ git+https://github.com/mne-tools/mne-qt-browser.git" pyvista scikit-learn python-picard qtpy nibabel sphinx-gallery "PySide6!=6.8.0,!=6.8.0.1,!=6.8.1.1,!=6.9.1" pandas neo pymatreader antio defusedxml + python -m pip install --progress-bar off "mne-qt-browser[opengl] @ git+https://github.com/mne-tools/mne-qt-browser.git" pyvista scikit-learn python-picard qtpy nibabel sphinx-gallery "PySide6!=6.8.0,!=6.8.0.1,!=6.8.1.1,!=6.9.1" pandas neo pymatreader antio defusedxml curryreader python -m pip uninstall -yq mne python -m pip install --progress-bar off --upgrade -e .[test] displayName: 'Install dependencies with pip' diff --git a/doc/changes/devel/13176.dependency.rst b/doc/changes/devel/13176.dependency.rst new file mode 100644 index 00000000000..713ce3ba502 --- /dev/null +++ b/doc/changes/devel/13176.dependency.rst @@ -0,0 +1 @@ +New reader for Neuroscan Curry files, using the curry-python-reader module, by `Dominik Welke`_. \ No newline at end of file diff --git a/environment.yml b/environment.yml index 30036dc8187..b9cc85ef444 100644 --- a/environment.yml +++ b/environment.yml @@ -5,6 +5,7 @@ channels: dependencies: - python >=3.10 - antio >=0.5.0 + - curryreader >=0.1.1 - darkdetect - decorator - defusedxml diff --git a/mne/io/curry/__init__.py b/mne/io/curry/__init__.py index fce6b7d9a32..d71f0465db4 100644 --- a/mne/io/curry/__init__.py +++ b/mne/io/curry/__init__.py @@ -5,3 +5,5 @@ # Copyright the MNE-Python contributors. from .curry import read_raw_curry +from .curry import read_impedances_curry +from .curry import read_montage_curry diff --git a/mne/io/curry/curry.py b/mne/io/curry/curry.py index 3e8347fba0d..18344420378 100644 --- a/mne/io/curry/curry.py +++ b/mne/io/curry/curry.py @@ -3,9 +3,7 @@ # License: BSD-3-Clause # Copyright the MNE-Python contributors. -import os.path as op import re -from collections import namedtuple from datetime import datetime, timezone from pathlib import Path @@ -16,530 +14,531 @@ from ..._fiff.meas_info import create_info from ..._fiff.tag import _coil_trans_to_loc from ..._fiff.utils import _mult_cal_one, _read_segments_file -from ...annotations import Annotations +from ...annotations import annotations_from_events +from ...channels import make_dig_montage +from ...epochs import Epochs from ...surface import _normal_orth -from ...transforms import ( - Transform, - _angle_between_quats, - apply_trans, - combine_transforms, - get_ras_to_neuromag_trans, - invert_transform, - rot_to_quat, +from ...transforms import Transform, apply_trans +from ...utils import ( + _soft_import, + verbose, + warn, ) -from ...utils import _check_fname, check_fname, logger, verbose from ..base import BaseRaw -from ..ctf.trans import _quaternion_align - -FILE_EXTENSIONS = { - "Curry 7": { - "info": ".dap", - "data": ".dat", - "labels": ".rs3", - "events_cef": ".cef", - "events_ceo": ".ceo", - "hpi": ".hpi", - }, - "Curry 8": { - "info": ".cdt.dpa", - "data": ".cdt", - "labels": ".cdt.dpa", - "events_cef": ".cdt.cef", - "events_ceo": ".cdt.ceo", - "hpi": ".cdt.hpi", - }, -} -CHANTYPES = {"meg": "_MAG1", "eeg": "", "misc": "_OTHERS"} -FIFFV_CHANTYPES = { - "meg": FIFF.FIFFV_MEG_CH, - "eeg": FIFF.FIFFV_EEG_CH, - "misc": FIFF.FIFFV_MISC_CH, -} -FIFFV_COILTYPES = { - "meg": FIFF.FIFFV_COIL_CTF_GRAD, - "eeg": FIFF.FIFFV_COIL_EEG, - "misc": FIFF.FIFFV_COIL_NONE, -} -SI_UNITS = dict(V=FIFF.FIFF_UNIT_V, T=FIFF.FIFF_UNIT_T) -SI_UNIT_SCALE = dict(c=1e-2, m=1e-3, u=1e-6, µ=1e-6, n=1e-9, p=1e-12, f=1e-15) - -CurryParameters = namedtuple( - "CurryParameters", - "n_samples, sfreq, is_ascii, unit_dict, n_chans, dt_start, chanidx_in_file", -) + +CURRY_SUFFIX_DATA = [".cdt", ".dat"] +CURRY_SUFFIX_HDR = [".cdt.dpa", ".cdt.dpo", ".dap"] +CURRY_SUFFIX_LABELS = [".cdt.dpa", ".cdt.dpo", ".rs3"] -def _get_curry_version(file_extension): - """Check out the curry file version.""" - return "Curry 8" if "cdt" in file_extension else "Curry 7" +def _check_curry_filename(fname): + fname_in = Path(fname) + fname_out = None + # try suffixes + if fname_in.suffix in CURRY_SUFFIX_DATA: + fname_out = fname_in + else: + for data_suff in CURRY_SUFFIX_DATA: + if fname_in.with_suffix(data_suff).exists(): + fname_out = fname_in.with_suffix(data_suff) + break + # final check + if not fname_out or not fname_out.exists(): + raise FileNotFoundError("no curry data file found (.dat or .cdt)") + return fname_out + + +def _check_curry_header_filename(fname): + fname_in = Path(fname) + fname_hdr = None + # try suffixes + for hdr_suff in CURRY_SUFFIX_HDR: + if fname_in.with_suffix(hdr_suff).exists(): + fname_hdr = fname_in.with_suffix(hdr_suff) + break + # final check + if not fname_hdr or not fname_in.exists(): + raise FileNotFoundError( + f"no corresponding header file found {CURRY_SUFFIX_HDR}" + ) + return fname_hdr -def _get_curry_file_structure(fname, required=()): - """Store paths to a dict and check for required files.""" - _msg = ( - "The following required files cannot be found: {0}.\nPlease make " - "sure all required files are located in the same directory as {1}." +def _check_curry_labels_filename(fname): + fname_in = Path(fname) + fname_labels = None + # try suffixes + for hdr_suff in CURRY_SUFFIX_LABELS: + if fname_in.with_suffix(hdr_suff).exists(): + fname_labels = fname_in.with_suffix(hdr_suff) + break + # final check + if not fname_labels or not fname_in.exists(): + raise FileNotFoundError( + f"no corresponding labels file found {CURRY_SUFFIX_HDR}" + ) + return fname_labels + + +def _get_curry_meas_info(fname): + # get other essential info not provided by curryreader + fname_hdr = _check_curry_header_filename(fname) + content_hdr = fname_hdr.read_text() + + # read meas_date + meas_date = [ + int(re.compile(rf"{v}\s*=\s*-?\d+").search(content_hdr).group(0).split()[-1]) + for v in [ + "StartYear", + "StartMonth", + "StartDay", + "StartHour", + "StartMin", + "StartSec", + "StartMillisec", + ] + ] + try: + meas_date = datetime( + *meas_date[:-1], + meas_date[-1] * 1000, # -> microseconds + timezone.utc, + ) + except Exception: + meas_date = None + + # read datatype + byteorder = ( + re.compile(r"DataByteOrder\s*=\s*[A-Z]+") + .search(content_hdr) + .group() + .split()[-1] + ) + is_ascii = byteorder == "ASCII" + + # amp info + # TODO - seems like there can be identifiable information (serial numbers, dates). + # MNE anonymization functions only overwrite "serial" and "site", though + # TODO - there can be filter details, too + amp_info = ( + re.compile(r"AmplifierInfo\s*=.*\n") + .search(content_hdr) + .group() + .strip("\n") + .split("= ")[-1] + .strip() ) - fname = Path(_check_fname(fname, "read", True, "fname")) - - # we don't use os.path.splitext to also handle extensions like .cdt.dpa - # this won't handle a dot in the filename, but it should handle it in - # the parent directories - fname_base = fname.name.split(".", maxsplit=1)[0] - ext = fname.name[len(fname_base) :] - fname_base = str(fname) - fname_base = fname_base[: len(fname_base) - len(ext)] - del fname - version = _get_curry_version(ext) - my_curry = dict() - for key in ("info", "data", "labels", "events_cef", "events_ceo", "hpi"): - fname = fname_base + FILE_EXTENSIONS[version][key] - if op.isfile(fname): - _key = "events" if key.startswith("events") else key - my_curry[_key] = fname - - missing = [field for field in required if field not in my_curry] - if missing: - raise FileNotFoundError(_msg.format(np.unique(missing), fname)) - - return my_curry - - -def _read_curry_lines(fname, regex_list): - """Read through the lines of a curry parameter files and save data. - Parameters - ---------- - fname : path-like - Path to a curry file. - regex_list : list of str - A list of strings or regular expressions to search within the file. - Each element `regex` in `regex_list` must be formulated so that - `regex + " START_LIST"` initiates the start and `regex + " END_LIST"` - initiates the end of the elements that should be saved. + device_info = ( + dict(type=amp_info) if amp_info != "" else None # model="", serial="", site="" + ) - Returns - ------- - data_dict : dict - A dictionary containing the extracted data. For each element `regex` - in `regex_list` a dictionary key `data_dict[regex]` is created, which - contains a list of the according data. + return meas_date, is_ascii, device_info - """ - save_lines = {} - data_dict = {} - for regex in regex_list: - save_lines[regex] = False - data_dict[regex] = [] +def _get_curry_recording_type(fname): + _soft_import("curryreader", "read recording modality") + + import curryreader - with open(fname) as fid: - for line in fid: - for regex in regex_list: - if re.match(regex + " END_LIST", line): - save_lines[regex] = False + epochinfo = curryreader.read(str(fname), plotdata=0, verbosity=1)["epochinfo"] + if epochinfo.size == 0: + return "raw" + else: + n_average = epochinfo[:, 0] + if (n_average == 1).all(): + return "epochs" + else: + return "evoked" - if save_lines[regex] and line != "\n": - result = line.replace("\n", "") - if "\t" in result: - result = result.split("\t") - data_dict[regex].append(result) - if re.match(regex + " START_LIST", line): - save_lines[regex] = True +def _get_curry_epoch_info(fname): + _soft_import("curryreader", "read epoch info") + _soft_import("pandas", "dataframe integration") - return data_dict + import curryreader + import pandas as pd + # use curry-python-reader + currydata = curryreader.read(str(fname), plotdata=0, verbosity=1) -def _read_curry_parameters(fname): - """Extract Curry params from a Curry info file.""" - _msg_match = ( - "The sampling frequency and the time steps extracted from " - "the parameter file do not match." + # get epoch info + sfreq = currydata["info"]["samplingfreq"] + n_samples = currydata["info"]["samples"] + n_epochs = len(currydata["epochlabels"]) + epochinfo = currydata["epochinfo"] + epochtypes = epochinfo[:, 2].astype(int).tolist() + epochlabels = currydata["epochlabels"] + epochmetainfo = pd.DataFrame( + epochinfo[:, -4:], columns=["accept", "correct", "response", "response time"] ) - _msg_invalid = "sfreq must be greater than 0. Got sfreq = {0}" - - var_names = [ - "NumSamples", - "SampleFreqHz", - "DataFormat", - "SampleTimeUsec", - "NumChannels", - "StartYear", - "StartMonth", - "StartDay", - "StartHour", - "StartMin", - "StartSec", - "StartMillisec", - "NUM_SAMPLES", - "SAMPLE_FREQ_HZ", - "DATA_FORMAT", - "SAMPLE_TIME_USEC", - "NUM_CHANNELS", - "START_YEAR", - "START_MONTH", - "START_DAY", - "START_HOUR", - "START_MIN", - "START_SEC", - "START_MILLISEC", + # create mne events + events = np.array( + [[i * n_samples for i in range(n_epochs)], [0] * n_epochs, epochtypes] + ).T + event_id = dict(zip(epochlabels, epochtypes)) + return dict( + events=events, + event_id=event_id, + tmin=0.0, + tmax=(n_samples - 1) / sfreq, + baseline=(0, 0), + metadata=epochmetainfo, + reject_by_annotation=False, + ) + + +def _get_curry_meg_normals(fname): + fname_hdr = _check_curry_header_filename(fname) + normals_str = fname_hdr.read_text().split("\n") + # i_start, i_stop = [ + # i + # for i, ll in enumerate(normals_str) + # if ("NORMALS" in ll and "START_LIST" in ll) + # or ("NORMALS" in ll and "END_LIST" in ll) + # ] + # normals_str = [nn.split("\t") for nn in normals_str[i_start + 1 : i_stop]] + i_list = [ + i + for i, ll in enumerate(normals_str) + if ("NORMALS" in ll and "START_LIST" in ll) + or ("NORMALS" in ll and "END_LIST" in ll) + ] + assert len(i_list) % 2 == 0 + i_start_list = i_list[::2] + i_stop_list = i_list[1::2] + normals_str = [ + nn.split("\t") + for i_start, i_stop in zip(i_start_list, i_stop_list) + for nn in normals_str[i_start + 1 : i_stop] ] + return np.array([[float(nnn.strip()) for nnn in nn] for nn in normals_str]) - param_dict = dict() - unit_dict = dict() - - with open(fname) as fid: - for line in iter(fid): - if any(var_name in line for var_name in var_names): - key, val = line.replace(" ", "").replace("\n", "").split("=") - param_dict[key.lower().replace("_", "")] = val - for key, type_ in CHANTYPES.items(): - if f"DEVICE_PARAMETERS{type_} START" in line: - data_unit = next(fid) - unit_dict[key] = ( - data_unit.replace(" ", "").replace("\n", "").split("=")[-1] - ) - - # look for CHAN_IN_FILE sections, which may or may not exist; issue #8391 - types = ["meg", "eeg", "misc"] - chanidx_in_file = _read_curry_lines( - fname, ["CHAN_IN_FILE" + CHANTYPES[key] for key in types] - ) - n_samples = int(param_dict["numsamples"]) - sfreq = float(param_dict["samplefreqhz"]) - time_step = float(param_dict["sampletimeusec"]) * 1e-6 - is_ascii = param_dict["dataformat"] == "ASCII" - n_channels = int(param_dict["numchannels"]) - try: - dt_start = datetime( - int(param_dict["startyear"]), - int(param_dict["startmonth"]), - int(param_dict["startday"]), - int(param_dict["starthour"]), - int(param_dict["startmin"]), - int(param_dict["startsec"]), - int(param_dict["startmillisec"]) * 1000, - timezone.utc, - ) - # Note that the time zone information is not stored in the Curry info - # file, and it seems the start time info is in the local timezone - # of the acquisition system (which is unknown); therefore, just set - # the timezone to be UTC. If the user knows otherwise, they can - # change it later. (Some Curry files might include StartOffsetUTCMin, - # but its presence is unpredictable, so we won't rely on it.) - except (ValueError, KeyError): - dt_start = None # if missing keywords or illegal values, don't set - - if time_step == 0: - true_sfreq = sfreq - elif sfreq == 0: - true_sfreq = 1 / time_step - elif not np.isclose(sfreq, 1 / time_step): - raise ValueError(_msg_match) - else: # they're equal and != 0 - true_sfreq = sfreq - if true_sfreq <= 0: - raise ValueError(_msg_invalid.format(true_sfreq)) - - return CurryParameters( - n_samples, - true_sfreq, - is_ascii, - unit_dict, - n_channels, - dt_start, - chanidx_in_file, - ) +def _extract_curry_info(fname): + _soft_import("curryreader", "read file header") + import curryreader -def _read_curry_info(curry_paths): - """Extract info from curry parameter files.""" - curry_params = _read_curry_parameters(curry_paths["info"]) - R = np.eye(4) - R[[0, 1], [0, 1]] = -1 # rotate 180 deg - # shift down and back - # (chosen by eyeballing to make the CTF helmet look roughly correct) - R[:3, 3] = [0.0, -0.015, -0.12] - curry_dev_dev_t = Transform("ctf_meg", "meg", R) - - # read labels from label files - label_fname = curry_paths["labels"] - types = ["meg", "eeg", "misc"] - labels = _read_curry_lines( - label_fname, ["LABELS" + CHANTYPES[key] for key in types] - ) - sensors = _read_curry_lines( - label_fname, ["SENSORS" + CHANTYPES[key] for key in types] - ) - normals = _read_curry_lines( - label_fname, ["NORMALS" + CHANTYPES[key] for key in types] - ) - assert len(labels) == len(sensors) == len(normals) - - all_chans = list() - dig_ch_pos = dict() - for key in ["meg", "eeg", "misc"]: - chanidx_is_explicit = ( - len(curry_params.chanidx_in_file["CHAN_IN_FILE" + CHANTYPES[key]]) > 0 - ) # channel index - # position in the datafile may or may not be explicitly declared, - # based on the CHAN_IN_FILE section in info file - for ind, chan in enumerate(labels["LABELS" + CHANTYPES[key]]): - chanidx = len(all_chans) + 1 # by default, just assume the - # channel index in the datafile is in order of the channel - # names as we found them in the labels file - if chanidx_is_explicit: # but, if explicitly declared, use - # that index number - chanidx = int( - curry_params.chanidx_in_file["CHAN_IN_FILE" + CHANTYPES[key]][ind] - ) - if chanidx <= 0: # if chanidx was explicitly declared to be ' 0', - # it means the channel is not actually saved in the data file - # (e.g. the "Ref" channel), so don't add it to our list. - # Git issue #8391 - continue - ch = { - "ch_name": chan, - "unit": curry_params.unit_dict[key], - "kind": FIFFV_CHANTYPES[key], - "coil_type": FIFFV_COILTYPES[key], - "ch_idx": chanidx, - } - if key == "eeg": - loc = np.array(sensors["SENSORS" + CHANTYPES[key]][ind], float) - # XXX just the sensor, where is ref (next 3)? - assert loc.shape == (3,) - loc /= 1000.0 # to meters - loc = np.concatenate([loc, np.zeros(9)]) - ch["loc"] = loc - # XXX need to check/ensure this - ch["coord_frame"] = FIFF.FIFFV_COORD_HEAD - dig_ch_pos[chan] = loc[:3] - elif key == "meg": - pos = np.array(sensors["SENSORS" + CHANTYPES[key]][ind], float) - pos /= 1000.0 # to meters - pos = pos[:3] # just the inner coil - pos = apply_trans(curry_dev_dev_t, pos) - nn = np.array(normals["NORMALS" + CHANTYPES[key]][ind], float) - assert np.isclose(np.linalg.norm(nn), 1.0, atol=1e-4) - nn /= np.linalg.norm(nn) - nn = apply_trans(curry_dev_dev_t, nn, move=False) - trans = np.eye(4) - trans[:3, 3] = pos - trans[:3, :3] = _normal_orth(nn).T - ch["loc"] = _coil_trans_to_loc(trans) - ch["coord_frame"] = FIFF.FIFFV_COORD_DEVICE - all_chans.append(ch) - dig = _make_dig_points( - dig_ch_pos=dig_ch_pos, coord_frame="head", add_missing_fiducials=True - ) - del dig_ch_pos - - ch_count = len(all_chans) - assert ch_count == curry_params.n_chans # ensure that we have assembled - # the same number of channels as declared in the info (.DAP) file in the - # DATA_PARAMETERS section. Git issue #8391 - - # sort the channels to assure they are in the order that matches how - # recorded in the datafile. In general they most likely are already in - # the correct order, but if the channel index in the data file was - # explicitly declared we might as well use it. - all_chans = sorted(all_chans, key=lambda ch: ch["ch_idx"]) - - ch_names = [chan["ch_name"] for chan in all_chans] - info = create_info(ch_names, curry_params.sfreq) - with info._unlock(): - info["meas_date"] = curry_params.dt_start # for Git issue #8398 - info["dig"] = dig - _make_trans_dig(curry_paths, info, curry_dev_dev_t) - - for ind, ch_dict in enumerate(info["chs"]): - all_chans[ind].pop("ch_idx") - ch_dict.update(all_chans[ind]) - assert ch_dict["loc"].shape == (12,) - ch_dict["unit"] = SI_UNITS[all_chans[ind]["unit"][1]] - ch_dict["cal"] = SI_UNIT_SCALE[all_chans[ind]["unit"][0]] - - return info, curry_params.n_samples, curry_params.is_ascii - - -_card_dict = { - "Left ear": FIFF.FIFFV_POINT_LPA, - "Nasion": FIFF.FIFFV_POINT_NASION, - "Right ear": FIFF.FIFFV_POINT_RPA, -} - - -def _make_trans_dig(curry_paths, info, curry_dev_dev_t): - # Coordinate frame transformations and definitions - no_msg = "Leaving device<->head transform as None" - info["dev_head_t"] = None - label_fname = curry_paths["labels"] - key = "LANDMARKS" + CHANTYPES["meg"] - lm = _read_curry_lines(label_fname, [key])[key] - lm = np.array(lm, float) - lm.shape = (-1, 3) - if len(lm) == 0: - # no dig - logger.info(no_msg + " (no landmarks found)") - return - lm /= 1000.0 - key = "LM_REMARKS" + CHANTYPES["meg"] - remarks = _read_curry_lines(label_fname, [key])[key] - assert len(remarks) == len(lm) - with info._unlock(): - info["dig"] = list() - cards = dict() - for remark, r in zip(remarks, lm): - kind = ident = None - if remark in _card_dict: - kind = FIFF.FIFFV_POINT_CARDINAL - ident = _card_dict[remark] - cards[ident] = r - elif remark.startswith("HPI"): - kind = FIFF.FIFFV_POINT_HPI - ident = int(remark[3:]) - 1 - if kind is not None: - info["dig"].append( - dict(kind=kind, ident=ident, r=r, coord_frame=FIFF.FIFFV_COORD_UNKNOWN) - ) - with info._unlock(): - info["dig"].sort(key=lambda x: (x["kind"], x["ident"])) - has_cards = len(cards) == 3 - has_hpi = "hpi" in curry_paths - if has_cards and has_hpi: # have all three - logger.info("Composing device<->head transformation from dig points") - hpi_u = np.array( - [d["r"] for d in info["dig"] if d["kind"] == FIFF.FIFFV_POINT_HPI], float - ) - hpi_c = np.ascontiguousarray(_first_hpi(curry_paths["hpi"])[: len(hpi_u), 1:4]) - unknown_curry_t = _quaternion_align("unknown", "ctf_meg", hpi_u, hpi_c, 1e-2) - angle = np.rad2deg( - _angle_between_quats( - np.zeros(3), rot_to_quat(unknown_curry_t["trans"][:3, :3]) + # use curry-python-reader + currydata = curryreader.read(str(fname), plotdata=0, verbosity=1) + + # basic info + sfreq = currydata["info"]["samplingfreq"] + n_samples = currydata["info"]["samples"] + if n_samples != currydata["data"].shape[0]: # normal in epoched data + n_samples = currydata["data"].shape[0] + if _get_curry_recording_type(fname) == "raw": + warn( + "sample count from header doesn't match actual data! " + "file corrupted? will use data shape" ) + + # channel information + n_ch = currydata["info"]["channels"] + ch_names = currydata["labels"] + ch_pos = currydata["sensorpos"] + landmarks = currydata["landmarks"] + if not isinstance(landmarks, np.ndarray): + landmarks = np.array(landmarks) + landmarkslabels = currydata["landmarkslabels"] + hpimatrix = currydata["hpimatrix"] + + # data + orig_format = "int" + # curryreader.py always reads float32, but this is probably just numpy. + # legacy MNE code states int. + + # events + events = currydata["events"] + # annotations = currydata[ + # "annotations" + # ] # TODO - these dont really seem to correspond to events! what is it? + + # impedance measurements + # moved to standalone def; see read_impedances_curry + # impedances = currydata["impedances"] + + # get other essential info not provided by curryreader + fname_hdr = _check_curry_header_filename(fname) + + # channel types and units + ch_types, units = [], [] + ch_groups = fname_hdr.read_text().split("DEVICE_PARAMETERS")[1::2] + for ch_group in ch_groups: + ch_group = re.compile(r"\s+").sub(" ", ch_group).strip() + groupid = ch_group.split()[0] + unit = ch_group.split("DataUnit = ")[1].split()[0] + n_ch_group = int(ch_group.split("NumChanThisGroup = ")[1].split()[0]) + ch_type = ( + "mag" if ("MAG" in groupid) else "misc" if ("OTHER" in groupid) else "eeg" ) - dist = 1000 * np.linalg.norm(unknown_curry_t["trans"][:3, 3]) - logger.info(f" Fit a {angle:0.1f}° rotation, {dist:0.1f} mm translation") - unknown_dev_t = combine_transforms( - unknown_curry_t, curry_dev_dev_t, "unknown", "meg" - ) - unknown_head_t = Transform( - "unknown", - "head", - get_ras_to_neuromag_trans( - *( - cards[key] - for key in ( - FIFF.FIFFV_POINT_NASION, - FIFF.FIFFV_POINT_LPA, - FIFF.FIFFV_POINT_RPA, - ) - ) - ), + # combine info + ch_types += [ch_type] * n_ch_group + units += [unit] * n_ch_group + + # This for Git issue #8391. In some cases, the 'labels' (.rs3 file will + # list channels that are not actually saved in the datafile (such as the + # 'Ref' channel). These channels are denoted in the 'info' (.dap) file + # in the CHAN_IN_FILE section with a '0' as their index. + # + # current curryreader cannot cope with this - loads the list of channels solely + # based on their order, so can be false. fix it here! + if not len(ch_types) == len(units) == len(ch_names) == n_ch: + # read relevant info + fname_lbl = _check_curry_labels_filename(fname) + lbl = fname_lbl.read_text().split("START_LIST") + ch_names_full = [] + for i in range(1, len(lbl)): + if "LABELS" in lbl[i - 1].split()[-1]: + for ll in lbl[i].split("\n")[1:]: + if "LABELS" not in ll: + ch_names_full.append(ll.strip()) + else: + break + hdr = fname_hdr.read_text().split("START_LIST") + chaninfile_full = [] + for i in range(1, len(hdr)): + if "CHAN_IN_FILE" in hdr[i - 1].split()[-1]: + for ll in hdr[i].split("\n")[1:]: + if "CHAN_IN_FILE" not in ll: + chaninfile_full.append(int(ll.strip())) + else: + break + # drop channels with chan_in_file==0, account for order + i_drop = [i for i, ich in enumerate(chaninfile_full) if ich == 0] + ch_names = [ + ch_names_full[i] for i in np.argsort(chaninfile_full) if i not in i_drop + ] + ch_pos = np.array( + [ + ch_pos[i] + for i in np.argsort(chaninfile_full) + if (i not in i_drop) and (i < len(ch_pos)) + ] ) - with info._unlock(): - info["dev_head_t"] = combine_transforms( - invert_transform(unknown_dev_t), unknown_head_t, "meg", "head" - ) - for d in info["dig"]: - d.update( - coord_frame=FIFF.FIFFV_COORD_HEAD, - r=apply_trans(unknown_head_t, d["r"]), - ) - else: - if has_cards: - no_msg += " (no .hpi file found)" - elif has_hpi: - no_msg += " (not all cardinal points found)" - else: - no_msg += " (neither cardinal points nor .hpi file found)" - logger.info(no_msg) - - -def _first_hpi(fname): - # Get the first HPI result - with open(fname) as fid: - for line in fid: - line = line.strip() - if any(x in line for x in ("FileVersion", "NumCoils")) or not line: - continue - hpi = np.array(line.split(), float) - break - else: - raise RuntimeError(f"Could not find valid HPI in {fname}") - # t is the first entry - assert hpi.ndim == 1 - hpi = hpi[1:] - hpi.shape = (-1, 5) - hpi /= 1000.0 - return hpi + ch_types = [ch_types[i] for i in np.argsort(chaninfile_full) if i not in i_drop] + units = [units[i] for i in np.argsort(chaninfile_full) if i not in i_drop] + assert len(ch_types) == len(units) == len(ch_names) == n_ch + assert len(ch_pos) == ch_types.count("eeg") + ch_types.count("mag") -def _read_events_curry(fname): - """Read events from Curry event files. + # finetune channel types (e.g. stim, eog etc might be identified by name) + # TODO? - Parameters - ---------- - fname : path-like - Path to a curry event file with extensions .cef, .ceo, - .cdt.cef, or .cdt.ceo + # scale data to SI units + orig_units = dict(zip(ch_names, units)) + cals = [ + 1.0 / 1e15 if (u == "fT") else 1.0 / 1e6 if (u == "uV") else 1.0 for u in units + ] - Returns - ------- - events : ndarray, shape (n_events, 3) - The array of events. - """ - check_fname( - fname, - "curry event", - (".cef", ".ceo", ".cdt.cef", ".cdt.ceo"), - endings_err=(".cef", ".ceo", ".cdt.cef", ".cdt.ceo"), + return ( + sfreq, + n_samples, + ch_names, + ch_types, + ch_pos, + landmarks, + landmarkslabels, + hpimatrix, + events, + orig_format, + orig_units, + cals, ) - events_dict = _read_curry_lines(fname, ["NUMBER_LIST"]) - # The first 3 column seem to contain the event information - curry_events = np.array(events_dict["NUMBER_LIST"], dtype=int)[:, 0:3] - - return curry_events - def _read_annotations_curry(fname, sfreq="auto"): r"""Read events from Curry event files. Parameters ---------- - fname : str + fname : path-like The filename. sfreq : float | 'auto' The sampling frequency in the file. If set to 'auto' then the - ``sfreq`` is taken from the respective info file of the same name with - according file extension (\*.dap for Curry 7; \*.cdt.dpa for Curry8). - So data.cef looks in data.dap and data.cdt.cef looks in data.cdt.dpa. + ``sfreq`` is taken from the fileheader. Returns ------- annot : instance of Annotations | None The annotations. """ - required = ["events", "info"] if sfreq == "auto" else ["events"] - curry_paths = _get_curry_file_structure(fname, required) - events = _read_events_curry(curry_paths["events"]) + fname = _check_curry_filename(fname) + (sfreq_fromfile, _, _, _, _, _, _, _, events, _, _, _) = _extract_curry_info(fname) if sfreq == "auto": - sfreq = _read_curry_parameters(curry_paths["info"]).sfreq + sfreq = sfreq_fromfile + elif np.isreal(sfreq): + if float(sfreq) != float(sfreq_fromfile): + warn( + f"provided sfreq ({sfreq} Hz) does not match freq from fileheader " + "({sfreq_fromfile} Hz)!" + ) + else: + raise ValueError("'sfreq' must be numeric or 'auto'") - onset = events[:, 0] / sfreq - duration = np.zeros(events.shape[0]) - description = events[:, 2] + if isinstance(events, np.ndarray): # if there are events + events = events.astype("int") + events = np.insert(events, 1, np.diff(events[:, 2:]).flatten(), axis=1)[:, :3] + return annotations_from_events(events, sfreq) + else: + warn("no event annotations found") + return None + + +def _make_curry_montage(ch_names, ch_types, ch_pos, landmarks, landmarkslabels): + # scale ch_pos to m?! + ch_pos /= 1000.0 + landmarks /= 1000.0 + # channel locations + # what about misc without pos? can they mess things up if unordered? + assert len(ch_pos) >= (ch_types.count("mag") + ch_types.count("eeg")) + assert len(ch_pos) == (ch_types.count("mag") + ch_types.count("eeg")) + ch_pos_eeg = { + ch_names[i]: ch_pos[i, :3] for i, t in enumerate(ch_types) if t == "eeg" + } + # landmarks and headshape + landmark_dict = dict(zip(landmarkslabels, landmarks)) + for k in ["Nas", "RPA", "LPA"]: + if k not in landmark_dict.keys(): + landmark_dict[k] = None + if len(landmarkslabels) > 0: + hpi_pos = landmarks[ + [i for i, n in enumerate(landmarkslabels) if re.match("HPI[1-99]", n)], : + ] + else: + hpi_pos = None + if len(landmarkslabels) > 0: + hsp_pos = landmarks[ + [i for i, n in enumerate(landmarkslabels) if re.match("H[1-99]", n)], : + ] + else: + hsp_pos = None + # make dig montage for eeg + mont = None + if ch_pos.shape[1] in [3, 6]: # eeg xyz space + mont = make_dig_montage( + ch_pos=ch_pos_eeg, + nasion=landmark_dict["Nas"], + lpa=landmark_dict["LPA"], + rpa=landmark_dict["RPA"], + hsp=hsp_pos, + hpi=hpi_pos, + coord_frame="unknown", + ) + else: # not recorded? + pass + + return mont + + +def _set_chanloc_curry(inst, ch_types, ch_pos, landmarks, landmarkslabels): + ch_names = inst.info["ch_names"] + + # scale ch_pos to m?! + ch_pos /= 1000.0 + landmarks /= 1000.0 + # channel locations + # what about misc without pos? can they mess things up if unordered? + assert len(ch_pos) >= (ch_types.count("mag") + ch_types.count("eeg")) + assert len(ch_pos) == (ch_types.count("mag") + ch_types.count("eeg")) + ch_pos_meg = { + ch_names[i]: ch_pos[i, :3] for i, t in enumerate(ch_types) if t == "mag" + } + ch_pos_eeg = { + ch_names[i]: ch_pos[i, :3] for i, t in enumerate(ch_types) if t == "eeg" + } + + # landmarks and headshape + landmark_dict = dict(zip(landmarkslabels, landmarks)) + for k in ["Nas", "RPA", "LPA"]: + if k not in landmark_dict.keys(): + landmark_dict[k] = None + if len(landmarkslabels) > 0: + hpi_pos = landmarks[ + [i for i, n in enumerate(landmarkslabels) if re.match("HPI.?[1-99]", n)], + :, + ] + else: + hpi_pos = None + if len(landmarkslabels) > 0: + hsp_pos = landmarks[ + [i for i, n in enumerate(landmarkslabels) if re.match("H.?[1-99]", n)], : + ] + else: + hsp_pos = None + + add_missing_fiducials = ( + True + if ( + isinstance(landmark_dict["Nas"], type(None)) + and isinstance(landmark_dict["LPA"], type(None)) + and isinstance(landmark_dict["RPA"], type(None)) + ) + else False # raises otherwise + ) + dig = _make_dig_points( + nasion=landmark_dict["Nas"], + lpa=landmark_dict["LPA"], + rpa=landmark_dict["RPA"], + hpi=hpi_pos, + extra_points=hsp_pos, + dig_ch_pos=ch_pos_eeg, + coord_frame="head", + add_missing_fiducials=add_missing_fiducials, + ) + with inst.info._unlock(): + inst.info["dig"] = dig + + # loc transformation for meg sensors (taken from previous version) + if len(ch_pos_meg) > 0: + R = np.eye(4) + R[[0, 1], [0, 1]] = -1 # rotate 180 deg + # shift down and back + # (chosen by eyeballing to make the CTF helmet look roughly correct) + R[:3, 3] = [0.0, -0.015, -0.12] + curry_dev_dev_t = Transform("ctf_meg", "meg", R) + + ch_normals_meg = _get_curry_meg_normals(inst.filenames[0]) + assert len(ch_normals_meg) == len(ch_pos_meg) + else: + curry_dev_dev_t, ch_normals_meg = None, None + # fill up chanlocs + assert len(ch_names) == len(ch_types) >= len(ch_pos) + for i, (ch_name, ch_type, ch_loc) in enumerate(zip(ch_names, ch_types, ch_pos)): + assert inst.info["ch_names"][i] == ch_name + ch = inst.info["chs"][i] + if ch_type == "eeg": + with inst.info._unlock(): + ch["loc"][:3] = ch_loc[:3] + ch["coord_frame"] = FIFF.FIFFV_COORD_HEAD + elif ch_type == "mag": + # transform mode + pos = ch_loc[:3] # just the inner coil for MEG + pos = apply_trans(curry_dev_dev_t, pos) + nn = ch_normals_meg[i] + assert np.isclose(np.linalg.norm(nn), 1.0, atol=1e-4) + nn /= np.linalg.norm(nn) + nn = apply_trans(curry_dev_dev_t, nn, move=False) + trans = np.eye(4) + trans[:3, 3] = pos + trans[:3, :3] = _normal_orth(nn).T + with inst.info._unlock(): + ch["loc"] = _coil_trans_to_loc(trans) + ch["coord_frame"] = FIFF.FIFFV_COORD_DEVICE + elif ch_type == "misc": + pass + else: + raise NotImplementedError - return Annotations(onset, duration, description) + # _make_trans_dig(curry_paths, inst.info, curry_dev_dev_t) # TODO - necessary?! @verbose -def read_raw_curry(fname, preload=False, verbose=None) -> "RawCurry": +def read_raw_curry( + fname, import_epochs_as_events=False, preload=False, verbose=None +) -> "RawCurry": """Read raw data from Curry files. Parameters @@ -547,6 +546,9 @@ def read_raw_curry(fname, preload=False, verbose=None) -> "RawCurry": fname : path-like Path to a curry file with extensions ``.dat``, ``.dap``, ``.rs3``, ``.cdt``, ``.cdt.dpa``, ``.cdt.cef`` or ``.cef``. + import_epochs_as_events : bool + Set to ``True`` if you want to import epoched recordings as continuous ``raw`` + object with event annotations. Only do this if you know your data allows it. %(preload)s %(verbose)s @@ -560,7 +562,26 @@ def read_raw_curry(fname, preload=False, verbose=None) -> "RawCurry": -------- mne.io.Raw : Documentation of attributes and methods of RawCurry. """ - return RawCurry(fname, preload, verbose) + fname = _check_curry_filename(fname) + rectype = _get_curry_recording_type(fname) + + inst = RawCurry(fname, preload, verbose) + if rectype in ["epochs", "evoked"]: + curry_epoch_info = _get_curry_epoch_info(fname) + if import_epochs_as_events: + epoch_annotations = annotations_from_events( + events=curry_epoch_info["events"], + event_desc={v: k for k, v in curry_epoch_info["event_id"].items()}, + sfreq=inst.info["sfreq"], + ) + inst.set_annotations(inst.annotations + epoch_annotations) + else: + inst = Epochs( + inst, **curry_epoch_info + ) # TODO - seems to reject flat channel + if rectype == "evoked": + raise NotImplementedError + return inst class RawCurry(BaseRaw): @@ -582,38 +603,86 @@ class RawCurry(BaseRaw): @verbose def __init__(self, fname, preload=False, verbose=None): - curry_paths = _get_curry_file_structure( - fname, required=["info", "data", "labels"] - ) - - data_fname = op.abspath(curry_paths["data"]) - - info, n_samples, is_ascii = _read_curry_info(curry_paths) - + fname = _check_curry_filename(fname) + + ( + sfreq, + n_samples, + ch_names, + ch_types, + ch_pos, + landmarks, + landmarkslabels, + hpimatrix, + events, + orig_format, + orig_units, + cals, + ) = _extract_curry_info(fname) + + meas_date, is_ascii, device_info = _get_curry_meas_info(fname) + + # construct info + info = create_info(ch_names=ch_names, sfreq=sfreq, ch_types=ch_types) + info["device_info"] = device_info + + # create raw object last_samps = [n_samples - 1] raw_extras = dict(is_ascii=is_ascii) - super().__init__( info, - preload, - filenames=[data_fname], + preload=False, + filenames=[fname], last_samps=last_samps, - orig_format="int", + orig_format=orig_format, raw_extras=[raw_extras], + orig_units=orig_units, verbose=verbose, ) - if "events" in curry_paths: - logger.info( - "Event file found. Extracting Annotations from " - f"{curry_paths['events']}..." - ) - annots = _read_annotations_curry( - curry_paths["events"], sfreq=self.info["sfreq"] - ) - self.set_annotations(annots) - else: - logger.info("Event file not found. No Annotations set.") + # set meas_date + self.set_meas_date(meas_date) + + # scale data to SI units + self._cals = np.array(cals) + if isinstance(preload, bool | np.bool_) and preload: + self.load_data() + + # set events / annotations + # format from curryreader: sample, etype, startsample, endsample + if isinstance(events, np.ndarray): # if there are events + events = events.astype("int") + events = np.insert(events, 1, np.diff(events[:, 2:]).flatten(), axis=1)[ + :, :3 + ] + annot = annotations_from_events(events, sfreq) + self.set_annotations(annot) + + # add sensor locations + # TODO - review wanted! + assert len(self.info["ch_names"]) == len(ch_types) >= len(ch_pos) + _set_chanloc_curry( + inst=self, + ch_types=ch_types, + ch_pos=ch_pos, + landmarks=landmarks, + landmarkslabels=landmarkslabels, + ) + + # add HPI data (if present) + # from curryreader docstring: + # "HPI-coil measurements matrix (Orion-MEG only) where every row is: + # [measurementsample, dipolefitflag, x, y, z, deviation]" + # that's incorrect, though. it seems to be: + # [sample, dipole_1, x_1,y_1, z_1, dev_1, ..., dipole_n, x_n, ...] + # for all n coils. + # TODO - do they actually store cHPI? + if not isinstance(hpimatrix, list): + warn("cHPI data found, but reader not implemented.") + hpisamples = hpimatrix[:, 0] + n_coil = int((hpimatrix.shape[1] - 1) / 5) + hpimatrix = hpimatrix[:, 1:].reshape(hpimatrix.shape[0], n_coil, 5) + print(f"found {len(hpisamples)} cHPI samples for {n_coil} coils") def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): """Read a chunk of raw data.""" @@ -629,3 +698,73 @@ def _read_segment_file(self, data, idx, fi, start, stop, cals, mult): _read_segments_file( self, data, idx, fi, start, stop, cals, mult, dtype=" 100 +def test_read_raw_curry_preload_equal(fname): + """Test raw identity with preload=True/False.""" + raw1 = read_raw_curry(fname, preload=False) + raw1.load_data() + assert raw1 == read_raw_curry(fname, preload=True) @testing.requires_testing_data @@ -300,11 +134,10 @@ def test_read_events_curry_are_same_as_bdf(fname): """Test events from curry annotations recovers the right events.""" EVENT_ID = {str(ii): ii for ii in range(5)} REF_EVENTS = find_events(read_raw_bdf(bdf_file, preload=True)) - raw = read_raw_curry(fname) events, _ = events_from_annotations(raw, event_id=EVENT_ID) assert_allclose(events, REF_EVENTS) - assert raw.info["dev_head_t"] is None + assert raw.info["dev_head_t"] == Transform("meg", "head") @testing.requires_testing_data @@ -312,72 +145,17 @@ def test_check_missing_files(): """Test checking for missing curry files (smoke test).""" invalid_fname = "/invalid/path/name.xy" - with pytest.raises(OSError, match="file type .*? must end with"): - _read_events_curry(invalid_fname) - - with pytest.raises(FileNotFoundError, match="does not exist"): - _get_curry_file_structure(invalid_fname) - - with pytest.raises(FileNotFoundError, match="files cannot be found"): - _get_curry_file_structure(missing_event_file, required=["info", "events"]) - - -def _mock_info_file(src, dst, sfreq, time_step): - with open(src) as in_file, open(dst, "w") as out_file: - for line in in_file: - if "SampleFreqHz" in line: - out_file.write(line.replace("500", str(sfreq))) - elif "SampleTimeUsec" in line: - out_file.write(line.replace("2000", str(time_step))) - else: - out_file.write(line) + with pytest.raises(FileNotFoundError, match="no curry data file"): + _check_curry_filename(invalid_fname) + with pytest.raises(FileNotFoundError, match="no corresponding header"): + _check_curry_header_filename(invalid_fname) -@pytest.fixture( - params=[ - pytest.param(dict(sfreq=500, time_step=0), id="correct sfreq"), - pytest.param(dict(sfreq=0, time_step=2000), id="correct time_step"), - pytest.param(dict(sfreq=500, time_step=2000), id="both correct"), - pytest.param( - dict(sfreq=0, time_step=0), - id="both 0", - marks=pytest.mark.xfail(raises=ValueError), - ), - pytest.param( - dict(sfreq=500, time_step=42), - id="mismatch", - marks=pytest.mark.xfail(raises=ValueError), - ), - ] -) -def sfreq_testing_data(tmp_path, request): - """Generate different sfreq, time_step scenarios to be tested.""" - sfreq, time_step = request.param["sfreq"], request.param["time_step"] - - # create dummy empty files for 'dat' and 'rs3' - for fname in ["curry.dat", "curry.rs3"]: - open(tmp_path / fname, "a").close() - - _mock_info_file( - src=curry7_bdf_file.with_suffix(".dap"), - dst=tmp_path / "curry.dap", - sfreq=sfreq, - time_step=time_step, - ) - _mock_info_file( - src=curry7_bdf_file.with_suffix(".rs3"), - dst=tmp_path / "curry.rs3", - sfreq=sfreq, - time_step=time_step, - ) - return tmp_path / "curry.dat" - + with pytest.raises(FileNotFoundError, match="no curry data file"): + read_raw_curry(invalid_fname) -@testing.requires_testing_data -def test_sfreq(sfreq_testing_data): - """Test sfreq and time_step.""" - raw = read_raw_curry(sfreq_testing_data, preload=False) - assert raw.info["sfreq"] == 500 + with pytest.raises(FileNotFoundError, match="no curry data file"): + read_impedances_curry(invalid_fname) @testing.requires_testing_data @@ -385,13 +163,9 @@ def test_sfreq(sfreq_testing_data): "fname", [ pytest.param(curry_dir / "test_bdf_stim_channel Curry 7.cef", id="7"), - pytest.param(curry_dir / "test_bdf_stim_channel Curry 8.cdt.cef", id="8"), pytest.param( curry_dir / "test_bdf_stim_channel Curry 7 ASCII.cef", id="7 ascii" ), - pytest.param( - curry_dir / "test_bdf_stim_channel Curry 8 ASCII.cdt.cef", id="8 ascii" - ), ], ) def test_read_curry_annotations(fname): @@ -445,94 +219,10 @@ def test_read_curry_annotations(fname): assert_array_equal(annot.duration, EXPECTED_DURATION) assert_array_equal(annot.description, EXPECTED_DESCRIPTION) - -def _get_read_annotations_mock_info(name_part, mock_dir): - original, modified = dict(), dict() - - original["event"] = curry_dir / ("test_bdf_stim_channel " + name_part) - original["base"], ext = str(original["event"]).split(".", maxsplit=1) - original["base"] = Path(original["base"]) - version = _get_curry_version(ext) - original["info"] = original["base"].with_suffix(FILE_EXTENSIONS[version]["info"]) - - modified["base"] = mock_dir / "curry" - modified["event"] = modified["base"].with_suffix( - FILE_EXTENSIONS[version]["events_cef"] - ) - modified["info"] = modified["base"].with_suffix(FILE_EXTENSIONS[version]["info"]) - - return original, modified - - -@testing.requires_testing_data -@pytest.mark.parametrize( - "name_part", - [ - pytest.param("7.cef", id="7"), - pytest.param("8.cdt.cef", id="8"), - pytest.param("7 ASCII.cef", id="7 (ascii)"), - pytest.param("8 ASCII.cdt.cef", id="8 (ascii)"), - ], -) -def test_read_curry_annotations_using_mocked_info(tmp_path, name_part): - """Test reading for Curry events file.""" - EXPECTED_ONSET = [ - 0.484, - 0.486, - 0.62, - 0.622, - 1.904, - 1.906, - 3.212, - 3.214, - 4.498, - 4.5, - 5.8, - 5.802, - 7.074, - 7.076, - 8.324, - 8.326, - 9.58, - 9.582, - ] - EXPECTED_DURATION = np.zeros_like(EXPECTED_ONSET) - EXPECTED_DESCRIPTION = [ - "4", - "50000", - "2", - "50000", - "1", - "50000", - "1", - "50000", - "1", - "50000", - "1", - "50000", - "1", - "50000", - "1", - "50000", - "1", - "50000", - ] - - original, fname = _get_read_annotations_mock_info("Curry " + name_part, tmp_path) - copyfile(src=original["event"], dst=fname["event"]) - - _msg = "required files cannot be found" - with pytest.raises(FileNotFoundError, match=_msg): - read_annotations(fname["event"], sfreq="auto") - - _mock_info_file(src=original["info"], dst=fname["info"], sfreq=0, time_step=2000) - - annot = read_annotations(fname["event"], sfreq="auto") - - assert annot.orig_time is None - assert_array_equal(annot.onset, EXPECTED_ONSET) - assert_array_equal(annot.duration, EXPECTED_DURATION) - assert_array_equal(annot.description, EXPECTED_DESCRIPTION) + with pytest.raises(ValueError, match="must be numeric or 'auto'"): + _ = read_annotations(fname, sfreq="nonsense") + with pytest.warns(RuntimeWarning, match="does not match freq from fileheader"): + _ = read_annotations(fname, sfreq=12.0) @testing.requires_testing_data @@ -565,45 +255,47 @@ def test_read_files_missing_channel(fname, expected_channel_list): assert raw.ch_names == expected_channel_list +@testing.requires_testing_data +def test_read_device_info(): + """Test extraction of device_info.""" + raw = read_raw_curry(curry7_bdf_file) + assert not raw.info["device_info"] + raw2 = read_raw_curry(Ref_chan_omitted_file) + assert isinstance(raw2.info["device_info"], dict) + + @testing.requires_testing_data @pytest.mark.parametrize( - "fname,expected_meas_date", + "fname", [ - pytest.param( - Ref_chan_omitted_file, - datetime(2018, 11, 21, 12, 53, 48, 525000, tzinfo=timezone.utc), - id="valid start date", - ), - pytest.param(curry7_rfDC_file, None, id="start date year is 0"), - pytest.param(curry7_bdf_file, None, id="start date seconds invalid"), + pytest.param(curry7_bdf_file, id="curry 7"), + pytest.param(curry8_bdf_file, id="curry 8"), + pytest.param(curry7_bdf_ascii_file, id="curry 7 ascii"), + pytest.param(curry8_bdf_ascii_file, id="curry 8 ascii"), ], ) -def test_meas_date(fname, expected_meas_date): - """Test reading acquisition start datetime info info['meas_date'].""" - # This for Git issue #8398. The 'info' (.dap) file includes acquisition - # start date & time. Test that this goes into raw.info['meas_date']. - # If the information is not valid, raw.info['meas_date'] should be None - raw = read_raw_curry(fname, preload=False) - assert raw.info["meas_date"] == expected_meas_date +def test_read_impedances_curry(fname): + """Test reading impedances from CURRY files.""" + _, imp = read_impedances_curry(fname) + actual_imp = empty(shape=(0, 3)) + assert_allclose( + imp, + actual_imp, + ) @testing.requires_testing_data @pytest.mark.parametrize( - "fname, others", + "fname", [ - pytest.param(curry7_rfDC_file, (".dap", ".rs3"), id="curry7"), - pytest.param(curry8_rfDC_file, (".cdt.dpa",), id="curry8"), + pytest.param(curry7_bdf_file, id="curry 7"), + pytest.param(curry8_bdf_file, id="curry 8"), + pytest.param(curry7_bdf_ascii_file, id="curry 7 ascii"), + pytest.param(curry8_bdf_ascii_file, id="curry 8 ascii"), ], ) -def test_dot_names(fname, others, tmp_path): - """Test that dots are parsed properly (e.g., in paths).""" - my_path = tmp_path / "dot.dot.dot" - my_path.mkdir() - my_path = my_path / Path(fname).parts[-1] - fname = Path(fname) - copyfile(fname, my_path) - for ext in others: - this_fname = fname.with_suffix(ext) - to_fname = my_path.with_suffix(ext) - copyfile(this_fname, to_fname) - read_raw_curry(my_path) +def test_read_montage_curry(fname): + """Test reading montage from CURRY files.""" + mont = read_montage_curry(fname) + assert isinstance(mont, DigMontage) + # TODO - not very specific, yet diff --git a/mne/utils/config.py b/mne/utils/config.py index af5bfc6714a..9ed1c8eb168 100644 --- a/mne/utils/config.py +++ b/mne/utils/config.py @@ -837,6 +837,7 @@ def sys_info( "neo", "eeglabio", "edfio", + "curryreader", "mffpy", "pybv", "antio", diff --git a/pyproject.toml b/pyproject.toml index 04683b47507..29ba8a6aab0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -88,6 +88,7 @@ full = ["mne[full-no-qt]", "PyQt6 != 6.6.0", "PyQt6-Qt6 != 6.6.0, != 6.7.0"] # and mne[full-pyside6], which will install PySide6 instead of PyQt6. full-no-qt = [ "antio >= 0.5.0", + "curryreader >= 0.1.1", "darkdetect", "defusedxml", "dipy", @@ -159,6 +160,7 @@ test = [ # Dependencies for being able to run additional tests (rare/CIs/advanced devs) # Changes here should be reflected in the mne/utils/config.py dev dependencies section test_extra = [ + "curryreader >= 0.1.1", "edfio >= 0.2.1", "eeglabio", "imageio >= 2.6.1", diff --git a/tools/circleci_dependencies.sh b/tools/circleci_dependencies.sh index 1d5c1da28cc..6a4eacd1f03 100755 --- a/tools/circleci_dependencies.sh +++ b/tools/circleci_dependencies.sh @@ -9,7 +9,7 @@ python -m pip install --upgrade --progress-bar off \ "git+https://github.com/mne-tools/mne-bids.git" \ "git+https://github.com/mne-tools/mne-qt-browser.git" \ \ - alphaCSC autoreject bycycle conpy emd fooof meggie \ + alphaCSC autoreject bycycle conpy curryreader emd fooof meggie \ mne-ari mne-bids-pipeline mne-faster mne-features \ mne-icalabel mne-lsl mne-microstates mne-nirs mne-rsa \ neurodsp neurokit2 niseq nitime pactools mnelab \ diff --git a/tools/install_pre_requirements.sh b/tools/install_pre_requirements.sh index faa35139047..75d5ee3c5e3 100755 --- a/tools/install_pre_requirements.sh +++ b/tools/install_pre_requirements.sh @@ -58,7 +58,8 @@ python -m pip install $STD_ARGS \ git+https://github.com/BUNPC/pysnirf2 \ git+https://github.com/the-siesta-group/edfio \ trame trame-vtk trame-vuetify jupyter ipyevents ipympl openmeeg \ - imageio-ffmpeg xlrd mffpy traitlets pybv eeglabio defusedxml antio + imageio-ffmpeg xlrd mffpy traitlets pybv eeglabio defusedxml antio \ + curryreader echo "::endgroup::" echo "::group::Make sure we're on a NumPy 2.0 variant"