Skip to content

FIX: Read Nihon Kohden annotation file accurately #13251

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/devel/13251.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Read Nihon Kohden annotation file accurately, by `Tom Ma`_.
66 changes: 55 additions & 11 deletions mne/io/nihon/nihon.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,45 @@ def _read_nihon_header(fname):
return header


def _read_event_log_block(fid, t_block, version):
fid.seek(0x92 + t_block * 20)
data = np.fromfile(fid, np.uint32, 1)
if data.size == 0 or data[0] == 0:
return
t_blk_address = data[0]

fid.seek(t_blk_address + 0x1)
data = np.fromfile(fid, "|S16", 1).astype("U16")
if data.size == 0 or data[0] != version:
return

fid.seek(t_blk_address + 0x12)
data = np.fromfile(fid, np.uint8, 1)
if data.size == 0:
return
n_logs = data[0]

fid.seek(t_blk_address + 0x14)
return np.fromfile(fid, "|S45", n_logs)


def _parse_event_log(event_log):
t_desc = event_log[:20]
hour, minute, second = (
int(event_log[20:22]),
int(event_log[22:24]),
int(event_log[24:26]),
)
t_onset = hour * 3600 + minute * 60 + second
return t_desc, t_onset


def _parse_sub_event_log(sub_event_log):
t_sub_desc = sub_event_log[:20]
t_sub_onset = int(sub_event_log[24:30]) / 1e6
return t_sub_desc, t_sub_onset


def _read_nihon_annotations(fname):
fname = _ensure_path(fname)
log_fname = fname.with_suffix(".LOG")
Expand All @@ -292,27 +331,32 @@ def _read_nihon_annotations(fname):
n_logblocks = np.fromfile(fid, np.uint8, 1)[0]
all_onsets = []
all_descriptions = []
may_have_sub_blocks = n_logblocks <= 21
for t_block in range(n_logblocks):
fid.seek(0x92 + t_block * 20)
t_blk_address = np.fromfile(fid, np.uint32, 1)[0]
fid.seek(t_blk_address + 0x12)
n_logs = np.fromfile(fid, np.uint8, 1)[0]
fid.seek(t_blk_address + 0x14)
t_logs = np.fromfile(fid, "|S45", n_logs)
for t_log in t_logs:
t_logs = _read_event_log_block(fid, t_block, version)
t_sub_logs = None
if may_have_sub_blocks:
t_sub_logs = _read_event_log_block(fid, t_block + 22, version)

for i, t_log in enumerate(t_logs):
t_desc, t_onset = _parse_event_log(t_log)
if t_sub_logs is not None and t_sub_logs.size == t_logs.size:
t_sub_desc, t_sub_onset = _parse_sub_event_log(t_sub_logs[i])
t_desc += t_sub_desc
t_onset += t_sub_onset

t_desc = t_desc.rstrip(b"\x00")
for enc in _encodings:
try:
t_log = t_log.decode(enc)
t_desc = t_desc.decode(enc)
except UnicodeDecodeError:
pass
else:
break
else:
warn(f"Could not decode log as one of {_encodings}")
continue
t_desc = t_log[:20].strip("\x00")
t_onset = datetime.strptime(t_log[20:26], "%H%M%S")
t_onset = t_onset.hour * 3600 + t_onset.minute * 60 + t_onset.second

all_onsets.append(t_onset)
all_descriptions.append(t_desc)

Expand Down
Loading