15
15
import itertools
16
16
import logging
17
17
import struct
18
- from collections import OrderedDict , defaultdict
18
+ from collections import OrderedDict , defaultdict , namedtuple
19
19
from pathlib import Path
20
20
from xml .etree .ElementTree import ParseError , fromstring
21
21
@@ -77,6 +77,7 @@ def load_xdf(
77
77
select_streams = None ,
78
78
* ,
79
79
on_chunk = None ,
80
+ handle_non_monotonic = False ,
80
81
synchronize_clocks = True ,
81
82
handle_clock_resets = True ,
82
83
dejitter_timestamps = True ,
@@ -120,6 +121,13 @@ def load_xdf(
120
121
verbose : Passing True will set logging level to DEBUG, False will set it to
121
122
WARNING, and None will use root logger level. (default: None)
122
123
124
+ handle_non_monotonic : bool | None
125
+ Whether to warn only or sort samples that are not in ascending time
126
+ order. (default: False)
127
+ - bool : 'False' check only, warning when data are non-monotonic
128
+ 'True' check and sort non-monotonic data
129
+ - None : Disable non-monotonicity check
130
+
123
131
synchronize_clocks : Whether to enable clock synchronization based on
124
132
ClockOffset chunks. (default: true)
125
133
@@ -356,6 +364,33 @@ def load_xdf(
356
364
else :
357
365
stream .time_series = np .zeros ((0 , stream .nchns ))
358
366
367
+ # Perform initial non-monotonicity checks if requested
368
+ if handle_non_monotonic is None :
369
+ logger .debug (" skipping non-monotonicity check..." )
370
+ else :
371
+ logger .info (" performing non-monotonicity check..." )
372
+ mono_status = _check_monotonicity (temp )
373
+ # Are all time-values monotonic across all streams?
374
+ time_stamps_mono = all (mono_status ["time_stamps" ].values ())
375
+ clock_times_mono = all (mono_status ["clock_times" ].values ())
376
+ if time_stamps_mono and clock_times_mono :
377
+ # All streams are monotonic.
378
+ logger .info ("All streams are monotonic, continuing..." )
379
+ elif not handle_non_monotonic :
380
+ # Some data are non-monotonic, but we will not attempt to handle it.
381
+ if not synchronize_clocks and not clock_times_mono :
382
+ msg = (
383
+ "Clock offsets are non-monotonic but clocks are not going to "
384
+ "be synchronized. Consider loading with 'synchronize_clocks=True'."
385
+ )
386
+ logger .warning (msg )
387
+ else :
388
+ msg = (
389
+ "Non-monotonic streams detected - "
390
+ "consider loading with 'handle_non_monotonic=True'."
391
+ )
392
+ logger .warning (msg )
393
+
359
394
# perform (fault-tolerant) clock synchronization if requested
360
395
if synchronize_clocks :
361
396
logger .info (" performing clock synchronization..." )
@@ -369,6 +404,12 @@ def load_xdf(
369
404
winsor_threshold ,
370
405
)
371
406
407
+ # perform non-monotonicity handling if requested
408
+ if handle_non_monotonic :
409
+ logger .info (" sorting non-monotonic data..." )
410
+ for stream_id , stream in temp .items ():
411
+ _sort_stream_data (stream_id , stream )
412
+
372
413
# perform jitter removal if requested
373
414
if dejitter_timestamps :
374
415
logger .info (" performing jitter removal..." )
@@ -542,6 +583,151 @@ def _scan_forward(f):
542
583
return False
543
584
544
585
586
+ # Named tuple to represent monotonicity information.
587
+ Monotonicity = namedtuple ("Monotonicity" , ["result" , "n" , "dec_count" , "eq_count" ])
588
+ # - result: bool - True if all pairwise values are equal or increasing.
589
+ # - n: total number of pairwise values
590
+ # - dec_count: number of pairwise decreasing values
591
+ # - eq_count: number of pairwise equal values
592
+
593
+
594
+ def _monotonic_increasing (a ):
595
+ """Test for increasing (non-decreasing) monotonicity.
596
+
597
+ Parameters
598
+ ----------
599
+ a : array_like
600
+
601
+ Returns
602
+ -------
603
+ Monotonicity : namedtuple
604
+ """
605
+ # Intervals between successive values.
606
+ diffs = np .diff (a )
607
+
608
+ increasing = True
609
+ n = len (diffs )
610
+ dec_count = 0
611
+ eq_count = 0
612
+
613
+ # Count non-increasing intervals.
614
+ non_inc_count = np .sum (diffs <= 0 )
615
+
616
+ if non_inc_count > 0 :
617
+ # Count constant intervals.
618
+ eq_count = np .sum (diffs == 0 ).item ()
619
+ dec_count = (non_inc_count - eq_count ).item ()
620
+
621
+ if dec_count > 0 :
622
+ increasing = False
623
+ return Monotonicity (increasing , n , dec_count , eq_count )
624
+
625
+
626
+ def _check_monotonicity (streams ):
627
+ """Check monotonicity of all stream time values.
628
+
629
+ Parameters
630
+ ----------
631
+ streams : dict
632
+ Dictionary of stream_id: StreamData.
633
+
634
+ Returns
635
+ -------
636
+ mono_status : dict[dict]
637
+ Dictionary of attr: [stream_id: bool]
638
+ - attr: StreamData attribute name
639
+ - stream_id: Stream ID
640
+ - bool: stream attr data are monotonic
641
+ """
642
+ mono_status = {
643
+ "time_stamps" : {},
644
+ "clock_times" : {},
645
+ }
646
+
647
+ max_stream_id = max (streams .keys ())
648
+ id_align = len (str (max_stream_id ))
649
+
650
+ for stream_id , stream in streams .items ():
651
+ for attr in mono_status .keys ():
652
+ monotonic , n , dec_count , eq_count = _monotonic_increasing (
653
+ getattr (stream , attr )
654
+ )
655
+
656
+ mono_status [attr ][stream_id ] = monotonic
657
+
658
+ msg = [
659
+ (
660
+ f"Stream { stream_id :>{id_align }} : { attr } are"
661
+ f"{ '' if monotonic else ' NOT' } monotonic"
662
+ )
663
+ ]
664
+
665
+ if dec_count > 0 :
666
+ dec_pc = round (dec_count / n * 100 , 1 )
667
+ msg .append (f"dec={ dec_count } ({ '<0.1' if dec_pc < 0.1 else dec_pc } %)" )
668
+
669
+ if eq_count > 0 :
670
+ eq_pc = round (eq_count / n * 100 , 1 )
671
+ msg .append (f"eq={ eq_count } ({ '<0.1' if eq_pc < 0.1 else eq_pc } %)" )
672
+
673
+ msg = ", " .join (msg )
674
+
675
+ if monotonic :
676
+ logger .info (msg )
677
+ else :
678
+ logger .warning (msg )
679
+ return mono_status
680
+
681
+
682
+ def _sort_stream_data (stream_id , stream ):
683
+ """Sort stream data by ground truth timestamps.
684
+
685
+ Parameters
686
+ ----------
687
+ stream: StreamData (possibly non-monotonic)
688
+
689
+ Returns
690
+ -------
691
+ stream : StreamData (monotonic)
692
+ With sorted timestamps and timeseries data, if necessary.
693
+
694
+ Non-monotonic streams are stable sorted in ascending order of ground
695
+ truth timestamps. When clock resets have been detected sorting is
696
+ applied within each clock segment, but clock segments themselves are
697
+ not sorted.
698
+
699
+ Both timestamp and timeseries arrays are modified, keeping all
700
+ samples aligned with their original timestamp.
701
+
702
+ Monotonic streams/segments are not modified.
703
+
704
+ Stream may still contain identically timestamped samples, but these
705
+ can be handled by dejittering.
706
+ """
707
+ if len (stream .time_stamps ) <= 1 :
708
+ return stream
709
+ clock_segments = stream .clock_segments
710
+ if len (clock_segments ) == 0 :
711
+ # Clocks have not been synchronized.
712
+ clock_segments = [(0 , len (stream .time_stamps ) - 1 )] # inclusive
713
+ for start_i , end_i in clock_segments :
714
+ ts_slice = slice (start_i , end_i + 1 )
715
+ if not _monotonic_increasing (stream .time_stamps [ts_slice ]).result :
716
+ logger .info (f"Sorting stream { stream_id } : clock segment { start_i } -{ end_i } ." )
717
+ # Determine monotonic timestamp ordering.
718
+ ind = np .argsort (stream .time_stamps [ts_slice ], kind = "stable" )
719
+ # Reorder timestamps in place.
720
+ stream .time_stamps [ts_slice ] = stream .time_stamps [ts_slice ][ind ]
721
+ # Reorder timeseries data.
722
+ if stream .fmt == "string" :
723
+ stream .time_series [ts_slice ] = np .array (stream .time_series [ts_slice ])[
724
+ ind
725
+ ].tolist ()
726
+ else :
727
+ stream .time_series [ts_slice ] = stream .time_series [ts_slice ][ind ]
728
+ return stream
729
+
730
+
545
731
def _find_segment_indices (b_breaks ):
546
732
"""Convert boundary breaks array to segment indices.
547
733
0 commit comments