15
15
import itertools
16
16
import logging
17
17
import struct
18
- from collections import OrderedDict , defaultdict
18
+ from collections import OrderedDict , defaultdict , namedtuple
19
19
from pathlib import Path
20
20
from xml .etree .ElementTree import ParseError , fromstring
21
21
@@ -79,6 +79,7 @@ def load_xdf(
79
79
select_streams = None ,
80
80
* ,
81
81
on_chunk = None ,
82
+ handle_non_monotonic = False ,
82
83
synchronize_clocks = True ,
83
84
handle_clock_resets = True ,
84
85
dejitter_timestamps = True ,
@@ -122,6 +123,13 @@ def load_xdf(
122
123
verbose : Passing True will set logging level to DEBUG, False will set it to
123
124
WARNING, and None will use root logger level. (default: None)
124
125
126
+ handle_non_monotonic : bool | None
127
+ Whether to warn only or sort samples that are not in ascending time
128
+ order. (default: False)
129
+ - bool : 'False' check only, warning when data are non-monotonic
130
+ 'True' check and sort non-monotonic data
131
+ - None : Disable non-monotonicity check
132
+
125
133
synchronize_clocks : Whether to enable clock synchronization based on
126
134
ClockOffset chunks. (default: true)
127
135
@@ -358,6 +366,33 @@ def load_xdf(
358
366
else :
359
367
stream .time_series = np .zeros ((0 , stream .nchns ))
360
368
369
+ # Perform initial non-monotonicity checks if requested
370
+ if handle_non_monotonic is None :
371
+ logger .debug (" skipping non-monotonicity check..." )
372
+ else :
373
+ logger .info (" performing non-monotonicity check..." )
374
+ mono_status = _check_monotonicity (temp )
375
+ # Are all time-values monotonic across all streams?
376
+ time_stamps_mono = all (mono_status ["time_stamps" ].values ())
377
+ clock_times_mono = all (mono_status ["clock_times" ].values ())
378
+ if time_stamps_mono and clock_times_mono :
379
+ # All streams are monotonic.
380
+ logger .info ("All streams are monotonic, continuing..." )
381
+ elif not handle_non_monotonic :
382
+ # Some data are non-monotonic, but we will not attempt to handle it.
383
+ if not synchronize_clocks and not clock_times_mono :
384
+ msg = (
385
+ "Clock offsets are non-monotonic but clocks are not going to "
386
+ "be synchronized. Consider loading with 'synchronize_clocks=True'."
387
+ )
388
+ logger .warning (msg )
389
+ else :
390
+ msg = (
391
+ "Non-monotonic streams detected - "
392
+ "consider loading with 'handle_non_monotonic=True'."
393
+ )
394
+ logger .warning (msg )
395
+
361
396
# perform (fault-tolerant) clock synchronization if requested
362
397
if synchronize_clocks :
363
398
logger .info (" performing clock synchronization..." )
@@ -371,6 +406,12 @@ def load_xdf(
371
406
winsor_threshold ,
372
407
)
373
408
409
+ # perform non-monotonicity handling if requested
410
+ if handle_non_monotonic :
411
+ logger .info (" sorting non-monotonic data..." )
412
+ for stream in temp .values ():
413
+ _sort_stream_data (stream )
414
+
374
415
# perform jitter removal if requested
375
416
if dejitter_timestamps :
376
417
logger .info (" performing jitter removal..." )
@@ -547,6 +588,153 @@ def _scan_forward(f):
547
588
return False
548
589
549
590
591
+ # Named tuple to represent monotonicity information.
592
+ Monotonicity = namedtuple ("Monotonicity" , ["result" , "n" , "dec_count" , "eq_count" ])
593
+ # - result: bool - True if all pairwise values are equal or increasing.
594
+ # - n: total number of pairwise values
595
+ # - dec_count: number of pairwise decreasing values
596
+ # - eq_count: number of pairwise equal values
597
+
598
+
599
+ def _monotonic_increasing (a ):
600
+ """Test for increasing (non-decreasing) monotonicity.
601
+
602
+ Parameters
603
+ ----------
604
+ a : array_like
605
+
606
+ Returns
607
+ -------
608
+ Monotonicity : namedtuple
609
+ """
610
+ # Intervals between successive values.
611
+ diffs = np .diff (a )
612
+
613
+ increasing = True
614
+ n = len (diffs )
615
+ dec_count = 0
616
+ eq_count = 0
617
+
618
+ # Count non-increasing intervals.
619
+ non_inc_count = np .sum (diffs <= 0 )
620
+
621
+ if non_inc_count > 0 :
622
+ # Count constant intervals.
623
+ eq_count = np .sum (diffs == 0 ).item ()
624
+ dec_count = (non_inc_count - eq_count ).item ()
625
+
626
+ if dec_count > 0 :
627
+ increasing = False
628
+ return Monotonicity (increasing , n , dec_count , eq_count )
629
+
630
+
631
+ def _check_monotonicity (streams ):
632
+ """Check monotonicity of all stream time values.
633
+
634
+ Parameters
635
+ ----------
636
+ streams : dict
637
+ Dictionary of stream_id: StreamData.
638
+
639
+ Returns
640
+ -------
641
+ mono_status : dict[dict]
642
+ Dictionary of attr: [stream_id: bool]
643
+ - attr: StreamData attribute name
644
+ - stream_id: Stream ID
645
+ - bool: stream attr data are monotonic
646
+ """
647
+ mono_status = {
648
+ "time_stamps" : {},
649
+ "clock_times" : {},
650
+ }
651
+
652
+ max_stream_id = max (streams .values (), key = lambda s : s .stream_id )
653
+ id_align = len (str (max_stream_id .stream_id ))
654
+
655
+ for stream in streams .values ():
656
+ for attr in mono_status .keys ():
657
+ monotonic , n , dec_count , eq_count = _monotonic_increasing (
658
+ getattr (stream , attr )
659
+ )
660
+
661
+ mono_status [attr ][stream .stream_id ] = monotonic
662
+
663
+ msg = [
664
+ (
665
+ f"Stream { stream .stream_id :>{id_align }} : { attr } are"
666
+ f"{ '' if monotonic else ' NOT' } monotonic"
667
+ )
668
+ ]
669
+
670
+ if dec_count > 0 :
671
+ dec_pc = round (dec_count / n * 100 , 1 )
672
+ msg .append (f"dec={ dec_count } ({ '<0.1' if dec_pc < 0.1 else dec_pc } %)" )
673
+
674
+ if eq_count > 0 :
675
+ eq_pc = round (eq_count / n * 100 , 1 )
676
+ msg .append (f"eq={ eq_count } ({ '<0.1' if eq_pc < 0.1 else eq_pc } %)" )
677
+
678
+ msg = ", " .join (msg )
679
+
680
+ if monotonic :
681
+ logger .info (msg )
682
+ else :
683
+ logger .warning (msg )
684
+ return mono_status
685
+
686
+
687
+ def _sort_stream_data (stream ):
688
+ """Sort stream data by ground truth timestamps.
689
+
690
+ Parameters
691
+ ----------
692
+ stream: StreamData (possibly non-monotonic)
693
+
694
+ Returns
695
+ -------
696
+ stream : StreamData (monotonic)
697
+ With sorted timestamps and timeseries data, if necessary.
698
+
699
+ Non-monotonic streams are stable sorted in ascending order of ground
700
+ truth timestamps. When clock resets have been detected sorting is
701
+ applied within each clock segment, but clock segments themselves are
702
+ not sorted.
703
+
704
+ Both timestamp and timeseries arrays are modified, keeping all
705
+ samples aligned with their original timestamp.
706
+
707
+ Monotonic streams/segments are not modified.
708
+
709
+ Stream may still contain identically timestamped samples, but these
710
+ can be handled by dejittering.
711
+ """
712
+ if len (stream .time_stamps ) <= 1 :
713
+ return stream
714
+ clock_segments = stream .clock_segments
715
+ if len (clock_segments ) == 0 :
716
+ # Clocks have not been synchronized.
717
+ clock_segments = [(0 , len (stream .time_stamps ) - 1 )] # inclusive
718
+ for start_i , end_i in clock_segments :
719
+ ts_slice = slice (start_i , end_i + 1 )
720
+ if not _monotonic_increasing (stream .time_stamps [ts_slice ]).result :
721
+ logger .info (
722
+ f"Sorting stream { stream .stream_id } : clock segment { start_i } -{ end_i } ."
723
+ )
724
+ # Determine monotonic timestamp ordering.
725
+ ind = np .argsort (stream .time_stamps [ts_slice ], kind = "stable" )
726
+ # Reorder timestamps in place.
727
+ stream .time_stamps [ts_slice ] = stream .time_stamps [ts_slice ][ind ]
728
+ # Reorder timeseries data.
729
+ if stream .fmt == "string" :
730
+ stream .time_series [ts_slice ] = np .array (stream .time_series [ts_slice ])[
731
+ ind
732
+ ].tolist ()
733
+ else :
734
+ stream .time_series [ts_slice ] = stream .time_series [ts_slice ][ind ]
735
+ return stream
736
+
737
+
550
738
def _find_segment_indices (b_breaks ):
551
739
"""Convert boundary breaks array to segment indices.
552
740
0 commit comments