Skip to content

Commit 55cfe97

Browse files
committed
Optionally handle non-monotonic data
* New load_xdf parameter: handle_non_monotonic (default: False) - None: disable monotonicity checking - False: check and warn only if non-monotonic data are detected - True: attempt to sort non-monotonic data * Additional tests for monotonicity
1 parent 7a34d29 commit 55cfe97

File tree

6 files changed

+673
-5
lines changed

6 files changed

+673
-5
lines changed

src/pyxdf/pyxdf.py

Lines changed: 190 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import itertools
1616
import logging
1717
import struct
18-
from collections import OrderedDict, defaultdict
18+
from collections import OrderedDict, defaultdict, namedtuple
1919
from pathlib import Path
2020
from xml.etree.ElementTree import ParseError, fromstring
2121

@@ -79,6 +79,7 @@ def load_xdf(
7979
select_streams=None,
8080
*,
8181
on_chunk=None,
82+
handle_non_monotonic=False,
8283
synchronize_clocks=True,
8384
handle_clock_resets=True,
8485
dejitter_timestamps=True,
@@ -122,6 +123,13 @@ def load_xdf(
122123
verbose : Passing True will set logging level to DEBUG, False will set it to
123124
WARNING, and None will use root logger level. (default: None)
124125
126+
handle_non_monotonic : bool | None
127+
Whether to warn only or sort samples that are not in ascending time
128+
order. (default: False)
129+
- bool : 'False' check only, warning when data are non-monotonic
130+
'True' check and sort non-monotonic data
131+
- None : Disable non-monotonicity check
132+
125133
synchronize_clocks : Whether to enable clock synchronization based on
126134
ClockOffset chunks. (default: true)
127135
@@ -358,6 +366,33 @@ def load_xdf(
358366
else:
359367
stream.time_series = np.zeros((0, stream.nchns))
360368

369+
# Perform initial non-monotonicity checks if requested
370+
if handle_non_monotonic is None:
371+
logger.debug(" skipping non-monotonicity check...")
372+
else:
373+
logger.info(" performing non-monotonicity check...")
374+
mono_status = _check_monotonicity(temp)
375+
# Are all time-values monotonic across all streams?
376+
time_stamps_mono = all(mono_status["time_stamps"].values())
377+
clock_times_mono = all(mono_status["clock_times"].values())
378+
if time_stamps_mono and clock_times_mono:
379+
# All streams are monotonic.
380+
logger.info("All streams are monotonic, continuing...")
381+
elif not handle_non_monotonic:
382+
# Some data are non-monotonic, but we will not attempt to handle it.
383+
if not synchronize_clocks and not clock_times_mono:
384+
msg = (
385+
"Clock offsets are non-monotonic but clocks are not going to "
386+
"be synchronized. Consider loading with 'synchronize_clocks=True'."
387+
)
388+
logger.warning(msg)
389+
else:
390+
msg = (
391+
"Non-monotonic streams detected - "
392+
"consider loading with 'handle_non_monotonic=True'."
393+
)
394+
logger.warning(msg)
395+
361396
# perform (fault-tolerant) clock synchronization if requested
362397
if synchronize_clocks:
363398
logger.info(" performing clock synchronization...")
@@ -371,6 +406,12 @@ def load_xdf(
371406
winsor_threshold,
372407
)
373408

409+
# perform non-monotonicity handling if requested
410+
if handle_non_monotonic:
411+
logger.info(" sorting non-monotonic data...")
412+
for stream in temp.values():
413+
_sort_stream_data(stream)
414+
374415
# perform jitter removal if requested
375416
if dejitter_timestamps:
376417
logger.info(" performing jitter removal...")
@@ -547,6 +588,154 @@ def _scan_forward(f):
547588
return False
548589

549590

591+
# Named tuple to represent monotonicity information.
592+
Monotonicity = namedtuple("Monotonicity", ["result", "n", "dec_count", "eq_count"])
593+
# - result: bool - True if all pairwise values are equal or increasing.
594+
# - n: total number of pairwise values
595+
# - dec_count: number of pairwise decreasing values
596+
# - eq_count: number of pairwise equal values
597+
598+
599+
def _monotonic_increasing(a):
600+
"""Test for increasing (non-decreasing) monotonicity.
601+
602+
Parameters
603+
----------
604+
a : array_like
605+
606+
Returns
607+
-------
608+
Monotonicity : namedtuple
609+
"""
610+
# Intervals between successive values.
611+
diffs = np.diff(a)
612+
613+
increasing = True
614+
n = len(diffs)
615+
dec_count = 0
616+
eq_count = 0
617+
618+
# Count non-increasing intervals.
619+
non_inc_count = np.sum(diffs <= 0)
620+
621+
if non_inc_count > 0:
622+
# Count constant intervals.
623+
eq_count = np.sum(diffs == 0).item()
624+
dec_count = (non_inc_count - eq_count).item()
625+
626+
if dec_count > 0:
627+
increasing = False
628+
return Monotonicity(increasing, n, dec_count, eq_count)
629+
630+
631+
def _check_monotonicity(streams):
632+
"""Check monotonicity of all stream time values.
633+
634+
Parameters
635+
----------
636+
streams : dict
637+
Dictionary of stream_id: StreamData.
638+
639+
Returns
640+
-------
641+
mono_status : dict[dict]
642+
Dictionary of attr: [stream_id: bool]
643+
- attr: StreamData attribute name
644+
- stream_id: Stream ID
645+
- bool: stream attr data are monotonic
646+
"""
647+
mono_status = {
648+
"time_stamps": {},
649+
"clock_times": {},
650+
}
651+
652+
max_stream_id = max(streams.values(), key=lambda s: s.stream_id)
653+
id_align = len(str(max_stream_id.stream_id))
654+
655+
for stream in streams.values():
656+
for attr in mono_status.keys():
657+
monotonic, n, dec_count, eq_count = _monotonic_increasing(
658+
getattr(stream, attr)
659+
)
660+
661+
mono_status[attr][stream.stream_id] = monotonic
662+
663+
msg = [
664+
(
665+
f"Stream {stream.stream_id:>{id_align}}: {attr} are"
666+
f"{'' if monotonic else ' NOT'} monotonic"
667+
)
668+
]
669+
670+
if dec_count > 0:
671+
dec_pc = round(dec_count / n * 100, 1)
672+
msg.append(f"dec={dec_count} ({'<0.1' if dec_pc < 0.1 else dec_pc}%)")
673+
674+
if eq_count > 0:
675+
eq_pc = round(eq_count / n * 100, 1)
676+
msg.append(f"eq={eq_count} ({'<0.1' if eq_pc < 0.1 else eq_pc}%)")
677+
678+
msg = ", ".join(msg)
679+
680+
if monotonic:
681+
logger.info(msg)
682+
else:
683+
logger.warning(msg)
684+
return mono_status
685+
686+
687+
def _sort_stream_data(stream):
688+
"""Sort stream data by ground truth timestamps.
689+
690+
Parameters
691+
----------
692+
stream: StreamData (possibly non-monotonic)
693+
694+
Returns
695+
-------
696+
stream : StreamData (monotonic)
697+
With sorted timestamps and timeseries data, if necessary.
698+
699+
Non-monotonic streams are stable sorted in ascending order of ground
700+
truth timestamps. When clock resets have been detected sorting is
701+
applied within each clock segment, but clock segments themselves are
702+
not sorted.
703+
704+
Both timestamp and timeseries arrays are modified, keeping all
705+
samples aligned with their original timestamp.
706+
707+
Monotonic streams/segments are not modified.
708+
709+
Stream may still contain identically timestamped samples, but these
710+
can be handled by dejittering.
711+
"""
712+
if len(stream.time_stamps) <= 1:
713+
return stream
714+
clock_segments = stream.clock_segments
715+
if len(clock_segments) == 0:
716+
# Clocks have not been synchronized.
717+
clock_segments = [(0, len(stream.time_stamps) - 1)] # inclusive
718+
for start_i, end_i in clock_segments:
719+
ts_slice = slice(start_i, end_i + 1)
720+
if not _monotonic_increasing(stream.time_stamps[ts_slice]).result:
721+
logger.info(
722+
f"Sorting stream {stream.stream_id}: "
723+
f"clock segment {start_i}-{end_i}."
724+
)
725+
# Determine monotonic timestamp ordering.
726+
ind = np.argsort(stream.time_stamps[ts_slice], kind="stable")
727+
# Reorder timestamps in place.
728+
stream.time_stamps[ts_slice] = stream.time_stamps[ts_slice][ind]
729+
# Reorder timeseries data.
730+
if stream.fmt == "string":
731+
stream.time_series[ts_slice] = np.array(stream.time_series[ts_slice])[
732+
ind
733+
].tolist()
734+
else:
735+
stream.time_series[ts_slice] = stream.time_series[ts_slice][ind]
736+
return stream
737+
738+
550739
def _find_segment_indices(b_breaks):
551740
"""Convert boundary breaks array to segment indices.
552741

0 commit comments

Comments
 (0)