diff --git a/doc/changes/devel/13251.bugfix.rst b/doc/changes/devel/13251.bugfix.rst new file mode 100644 index 00000000000..a47faf00f02 --- /dev/null +++ b/doc/changes/devel/13251.bugfix.rst @@ -0,0 +1 @@ +Read Nihon Kohden annotation file accurately, by `Tom Ma`_. diff --git a/mne/io/nihon/nihon.py b/mne/io/nihon/nihon.py index d6c992dd563..6f3b35018a5 100644 --- a/mne/io/nihon/nihon.py +++ b/mne/io/nihon/nihon.py @@ -276,6 +276,45 @@ def _read_nihon_header(fname): return header +def _read_event_log_block(fid, t_block, version): + fid.seek(0x92 + t_block * 20) + data = np.fromfile(fid, np.uint32, 1) + if data.size == 0 or data[0] == 0: + return + t_blk_address = data[0] + + fid.seek(t_blk_address + 0x1) + data = np.fromfile(fid, "|S16", 1).astype("U16") + if data.size == 0 or data[0] != version: + return + + fid.seek(t_blk_address + 0x12) + data = np.fromfile(fid, np.uint8, 1) + if data.size == 0: + return + n_logs = data[0] + + fid.seek(t_blk_address + 0x14) + return np.fromfile(fid, "|S45", n_logs) + + +def _parse_event_log(event_log): + t_desc = event_log[:20] + hour, minute, second = ( + int(event_log[20:22]), + int(event_log[22:24]), + int(event_log[24:26]), + ) + t_onset = hour * 3600 + minute * 60 + second + return t_desc, t_onset + + +def _parse_sub_event_log(sub_event_log): + t_sub_desc = sub_event_log[:20] + t_sub_onset = int(sub_event_log[24:30]) / 1e6 + return t_sub_desc, t_sub_onset + + def _read_nihon_annotations(fname): fname = _ensure_path(fname) log_fname = fname.with_suffix(".LOG") @@ -292,17 +331,24 @@ def _read_nihon_annotations(fname): n_logblocks = np.fromfile(fid, np.uint8, 1)[0] all_onsets = [] all_descriptions = [] + may_have_sub_blocks = n_logblocks <= 21 for t_block in range(n_logblocks): - fid.seek(0x92 + t_block * 20) - t_blk_address = np.fromfile(fid, np.uint32, 1)[0] - fid.seek(t_blk_address + 0x12) - n_logs = np.fromfile(fid, np.uint8, 1)[0] - fid.seek(t_blk_address + 0x14) - t_logs = np.fromfile(fid, "|S45", n_logs) - for t_log in t_logs: + t_logs = _read_event_log_block(fid, t_block, version) + t_sub_logs = None + if may_have_sub_blocks: + t_sub_logs = _read_event_log_block(fid, t_block + 22, version) + + for i, t_log in enumerate(t_logs): + t_desc, t_onset = _parse_event_log(t_log) + if t_sub_logs is not None and t_sub_logs.size == t_logs.size: + t_sub_desc, t_sub_onset = _parse_sub_event_log(t_sub_logs[i]) + t_desc += t_sub_desc + t_onset += t_sub_onset + + t_desc = t_desc.rstrip(b"\x00") for enc in _encodings: try: - t_log = t_log.decode(enc) + t_desc = t_desc.decode(enc) except UnicodeDecodeError: pass else: @@ -310,9 +356,7 @@ def _read_nihon_annotations(fname): else: warn(f"Could not decode log as one of {_encodings}") continue - t_desc = t_log[:20].strip("\x00") - t_onset = datetime.strptime(t_log[20:26], "%H%M%S") - t_onset = t_onset.hour * 3600 + t_onset.minute * 60 + t_onset.second + all_onsets.append(t_onset) all_descriptions.append(t_desc)