From 05b978189c535a33c2bf580a5653a5418233db50 Mon Sep 17 00:00:00 2001 From: myd7349 Date: Sat, 17 May 2025 22:51:03 +0800 Subject: [PATCH 1/2] FIX: Read Nihon Kohden annotation file accurately Fix #11267. --- doc/changes/devel/13251.bugfix.rst | 1 + mne/io/nihon/nihon.py | 65 ++++++++++++++++++++++++------ 2 files changed, 54 insertions(+), 12 deletions(-) create mode 100644 doc/changes/devel/13251.bugfix.rst diff --git a/doc/changes/devel/13251.bugfix.rst b/doc/changes/devel/13251.bugfix.rst new file mode 100644 index 00000000000..a47faf00f02 --- /dev/null +++ b/doc/changes/devel/13251.bugfix.rst @@ -0,0 +1 @@ +Read Nihon Kohden annotation file accurately, by `Tom Ma`_. diff --git a/mne/io/nihon/nihon.py b/mne/io/nihon/nihon.py index d6c992dd563..8390426cb5d 100644 --- a/mne/io/nihon/nihon.py +++ b/mne/io/nihon/nihon.py @@ -276,6 +276,41 @@ def _read_nihon_header(fname): return header +def _read_event_log_block(fid, t_block, device_type): + fid.seek(0x92 + t_block * 20) + t_blk_address = np.fromfile(fid, np.uint32, 1)[0] + if t_blk_address == 0: + return + + fid.seek(t_blk_address + 0x1) + data_name = np.fromfile(fid, "|S16", 1).astype("U16")[0] + if data_name != device_type: + return + + fid.seek(t_blk_address + 0x12) + n_logs = np.fromfile(fid, np.uint8, 1)[0] + fid.seek(t_blk_address + 0x14) + t_logs = np.fromfile(fid, "|S45", n_logs) + return t_logs + + +def _parse_event_log(event_log): + t_desc = event_log[:20] + hour, minute, second = ( + int(event_log[20:22]), + int(event_log[22:24]), + int(event_log[24:26]), + ) + t_onset = hour * 3600 + minute * 60 + second + return t_desc, t_onset + + +def _parse_sub_event_log(sub_event_log): + t_sub_desc = sub_event_log[:20] + t_sub_onset = int(sub_event_log[24:30]) / 1e6 + return t_sub_desc, t_sub_onset + + def _read_nihon_annotations(fname): fname = _ensure_path(fname) log_fname = fname.with_suffix(".LOG") @@ -292,27 +327,33 @@ def _read_nihon_annotations(fname): n_logblocks = np.fromfile(fid, np.uint8, 1)[0] all_onsets = [] all_descriptions = [] + may_have_sub_blocks = n_logblocks <= 21 for t_block in range(n_logblocks): - fid.seek(0x92 + t_block * 20) - t_blk_address = np.fromfile(fid, np.uint32, 1)[0] - fid.seek(t_blk_address + 0x12) - n_logs = np.fromfile(fid, np.uint8, 1)[0] - fid.seek(t_blk_address + 0x14) - t_logs = np.fromfile(fid, "|S45", n_logs) - for t_log in t_logs: + t_logs = _read_event_log_block(fid, t_block, version) + t_sub_logs = None + if may_have_sub_blocks: + t_sub_logs = _read_event_log_block(fid, t_block + 22, version) + assert t_sub_logs is None or len(t_logs) == len(t_sub_logs) + + for i, t_log in enumerate(t_logs): + t_desc, t_onset = _parse_event_log(t_log) + if t_sub_logs is not None: + t_sub_desc, t_sub_onset = _parse_sub_event_log(t_sub_logs[i]) + t_desc += t_sub_desc + t_onset += t_sub_onset + + t_desc = t_desc.rstrip(b"\x00") for enc in _encodings: try: - t_log = t_log.decode(enc) + t_desc = t_desc.decode(enc) except UnicodeDecodeError: pass else: break else: - warn(f"Could not decode log as one of {_encodings}") + warn(f"Could not decode log {t_desc} as one of {_encodings}") continue - t_desc = t_log[:20].strip("\x00") - t_onset = datetime.strptime(t_log[20:26], "%H%M%S") - t_onset = t_onset.hour * 3600 + t_onset.minute * 60 + t_onset.second + all_onsets.append(t_onset) all_descriptions.append(t_desc) From df1d550482f4d9d0ad207d8ce5f0ab4d4fbef781 Mon Sep 17 00:00:00 2001 From: myd7349 Date: Sun, 18 May 2025 18:42:59 +0800 Subject: [PATCH 2/2] Check for sub event more carefully --- mne/io/nihon/nihon.py | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/mne/io/nihon/nihon.py b/mne/io/nihon/nihon.py index 8390426cb5d..6f3b35018a5 100644 --- a/mne/io/nihon/nihon.py +++ b/mne/io/nihon/nihon.py @@ -276,22 +276,26 @@ def _read_nihon_header(fname): return header -def _read_event_log_block(fid, t_block, device_type): +def _read_event_log_block(fid, t_block, version): fid.seek(0x92 + t_block * 20) - t_blk_address = np.fromfile(fid, np.uint32, 1)[0] - if t_blk_address == 0: + data = np.fromfile(fid, np.uint32, 1) + if data.size == 0 or data[0] == 0: return + t_blk_address = data[0] fid.seek(t_blk_address + 0x1) - data_name = np.fromfile(fid, "|S16", 1).astype("U16")[0] - if data_name != device_type: + data = np.fromfile(fid, "|S16", 1).astype("U16") + if data.size == 0 or data[0] != version: return fid.seek(t_blk_address + 0x12) - n_logs = np.fromfile(fid, np.uint8, 1)[0] + data = np.fromfile(fid, np.uint8, 1) + if data.size == 0: + return + n_logs = data[0] + fid.seek(t_blk_address + 0x14) - t_logs = np.fromfile(fid, "|S45", n_logs) - return t_logs + return np.fromfile(fid, "|S45", n_logs) def _parse_event_log(event_log): @@ -333,11 +337,10 @@ def _read_nihon_annotations(fname): t_sub_logs = None if may_have_sub_blocks: t_sub_logs = _read_event_log_block(fid, t_block + 22, version) - assert t_sub_logs is None or len(t_logs) == len(t_sub_logs) for i, t_log in enumerate(t_logs): t_desc, t_onset = _parse_event_log(t_log) - if t_sub_logs is not None: + if t_sub_logs is not None and t_sub_logs.size == t_logs.size: t_sub_desc, t_sub_onset = _parse_sub_event_log(t_sub_logs[i]) t_desc += t_sub_desc t_onset += t_sub_onset @@ -351,7 +354,7 @@ def _read_nihon_annotations(fname): else: break else: - warn(f"Could not decode log {t_desc} as one of {_encodings}") + warn(f"Could not decode log as one of {_encodings}") continue all_onsets.append(t_onset)