15
15
import itertools
16
16
import logging
17
17
import struct
18
- from collections import OrderedDict , defaultdict
18
+ from collections import OrderedDict , defaultdict , namedtuple
19
19
from pathlib import Path
20
20
from xml .etree .ElementTree import ParseError , fromstring
21
21
@@ -77,6 +77,7 @@ def load_xdf(
77
77
select_streams = None ,
78
78
* ,
79
79
on_chunk = None ,
80
+ handle_non_monotonic = False ,
80
81
synchronize_clocks = True ,
81
82
handle_clock_resets = True ,
82
83
dejitter_timestamps = True ,
@@ -120,6 +121,13 @@ def load_xdf(
120
121
verbose : Passing True will set logging level to DEBUG, False will set it to
121
122
WARNING, and None will use root logger level. (default: None)
122
123
124
+ handle_non_monotonic : bool | None
125
+ Whether to warn only or sort samples that are not in ascending time
126
+ order. (default: False)
127
+ - bool : 'False' check only, warning when data are non-monotonic
128
+ 'True' check and sort non-monotonic data
129
+ - None : Disable non-monotonicity check
130
+
123
131
synchronize_clocks : Whether to enable clock synchronization based on
124
132
ClockOffset chunks. (default: true)
125
133
@@ -356,6 +364,33 @@ def load_xdf(
356
364
else :
357
365
stream .time_series = np .zeros ((0 , stream .nchns ))
358
366
367
+ # Perform initial non-monotonicity checks if requested
368
+ if handle_non_monotonic is None :
369
+ logger .debug (" skipping non-monotonicity check..." )
370
+ else :
371
+ logger .info (" performing non-monotonicity check..." )
372
+ mono_status = _check_monotonicity (temp )
373
+ # Are all time-values monotonic across all streams?
374
+ time_stamps_mono = all (mono_status ["time_stamps" ].values ())
375
+ clock_times_mono = all (mono_status ["clock_times" ].values ())
376
+ if time_stamps_mono and clock_times_mono :
377
+ # All streams are monotonic.
378
+ logger .info ("All streams are monotonic, continuing..." )
379
+ elif not handle_non_monotonic :
380
+ # Some data are non-monotonic, but we will not attempt to handle it.
381
+ if not synchronize_clocks and not clock_times_mono :
382
+ msg = (
383
+ "Clock offsets are non-monotonic but clocks are not going to "
384
+ "be synchronized. Consider loading with 'synchronize_clocks=True'."
385
+ )
386
+ logger .warning (msg )
387
+ else :
388
+ msg = (
389
+ "Non-monotonic streams detected - "
390
+ "consider loading with 'handle_non_monotonic=True'."
391
+ )
392
+ logger .warning (msg )
393
+
359
394
# perform (fault-tolerant) clock synchronization if requested
360
395
if synchronize_clocks :
361
396
logger .info (" performing clock synchronization..." )
@@ -369,6 +404,12 @@ def load_xdf(
369
404
winsor_threshold ,
370
405
)
371
406
407
+ # perform non-monotonicity handling if requested
408
+ if handle_non_monotonic :
409
+ logger .info (" sorting non-monotonic data..." )
410
+ for stream_id , stream in temp .items ():
411
+ _sort_stream_data (stream_id , stream )
412
+
372
413
# perform jitter removal if requested
373
414
if dejitter_timestamps :
374
415
logger .info (" performing jitter removal..." )
@@ -543,6 +584,153 @@ def _scan_forward(f):
543
584
return False
544
585
545
586
587
+ # Named tuple to represent monotonicity information.
588
+ Monotonicity = namedtuple ("Monotonicity" , ["result" , "n" , "dec_count" , "eq_count" ])
589
+ # - result: bool - True if all pairwise values are equal or increasing.
590
+ # - n: total number of pairwise values
591
+ # - dec_count: number of pairwise decreasing values
592
+ # - eq_count: number of pairwise equal values
593
+
594
+
595
+ def _monotonic_increasing (a ):
596
+ """Test for increasing (non-decreasing) monotonicity.
597
+
598
+ Parameters
599
+ ----------
600
+ a : array_like
601
+
602
+ Returns
603
+ -------
604
+ Monotonicity : namedtuple
605
+ """
606
+ # Intervals between successive values.
607
+ diffs = np .diff (a )
608
+
609
+ increasing = True
610
+ n = len (diffs )
611
+ dec_count = 0
612
+ eq_count = 0
613
+
614
+ # Count non-increasing intervals.
615
+ non_inc_count = np .sum (diffs <= 0 )
616
+
617
+ if non_inc_count > 0 :
618
+ # Count constant intervals.
619
+ eq_count = np .sum (diffs == 0 ).item ()
620
+ dec_count = (non_inc_count - eq_count ).item ()
621
+
622
+ if dec_count > 0 :
623
+ increasing = False
624
+ return Monotonicity (increasing , n , dec_count , eq_count )
625
+
626
+
627
+ def _check_monotonicity (streams ):
628
+ """Check monotonicity of all stream time values.
629
+
630
+ Parameters
631
+ ----------
632
+ streams : dict
633
+ Dictionary of stream_id: StreamData.
634
+
635
+ Returns
636
+ -------
637
+ mono_status : dict[dict]
638
+ Dictionary of attr: [stream_id: bool]
639
+ - attr: StreamData attribute name
640
+ - stream_id: Stream ID
641
+ - bool: stream attr data are monotonic
642
+ """
643
+ mono_status = {
644
+ "time_stamps" : {},
645
+ "clock_times" : {},
646
+ }
647
+
648
+ max_stream_id = max (streams .keys ())
649
+ id_align = len (str (max_stream_id ))
650
+
651
+ for stream_id , stream in streams .items ():
652
+ for attr in mono_status .keys ():
653
+ monotonic , n , dec_count , eq_count = _monotonic_increasing (
654
+ getattr (stream , attr )
655
+ )
656
+
657
+ mono_status [attr ][stream_id ] = monotonic
658
+
659
+ msg = [
660
+ (
661
+ f"Stream { stream_id :>{id_align }} : { attr } are"
662
+ f"{ '' if monotonic else ' NOT' } monotonic"
663
+ )
664
+ ]
665
+
666
+ if dec_count > 0 :
667
+ dec_pc = round (dec_count / n * 100 , 1 )
668
+ msg .append (f"dec={ dec_count } ({ '<0.1' if dec_pc < 0.1 else dec_pc } %)" )
669
+
670
+ if eq_count > 0 :
671
+ eq_pc = round (eq_count / n * 100 , 1 )
672
+ msg .append (f"eq={ eq_count } ({ '<0.1' if eq_pc < 0.1 else eq_pc } %)" )
673
+
674
+ msg = ", " .join (msg )
675
+
676
+ if monotonic :
677
+ logger .info (msg )
678
+ else :
679
+ logger .warning (msg )
680
+ return mono_status
681
+
682
+
683
+ def _sort_stream_data (stream_id , stream ):
684
+ """Sort stream data by ground truth timestamps.
685
+
686
+ Parameters
687
+ ----------
688
+ stream: StreamData (possibly non-monotonic)
689
+
690
+ Returns
691
+ -------
692
+ stream : StreamData (monotonic)
693
+ With sorted timestamps and timeseries data, if necessary.
694
+
695
+ Non-monotonic streams are stable sorted in ascending order of ground
696
+ truth timestamps. When clock resets have been detected sorting is
697
+ applied within each clock segment, but clock segments themselves are
698
+ not sorted.
699
+
700
+ Both timestamp and timeseries arrays are modified, keeping all
701
+ samples aligned with their original timestamp.
702
+
703
+ Monotonic streams/segments are not modified.
704
+
705
+ Stream may still contain identically timestamped samples, but these
706
+ can be handled by dejittering.
707
+ """
708
+ if len (stream .time_stamps ) <= 1 :
709
+ return stream
710
+ clock_segments = stream .clock_segments
711
+ if len (clock_segments ) == 0 :
712
+ # Clocks have not been synchronized.
713
+ clock_segments = [(0 , len (stream .time_stamps ) - 1 )] # inclusive
714
+ for start_i , end_i in clock_segments :
715
+ ts_slice = slice (start_i , end_i + 1 )
716
+ if not _monotonic_increasing (stream .time_stamps [ts_slice ]).result :
717
+ logger .info (
718
+ f"Sorting stream { stream_id } : clock segment { start_i } -{ end_i } ."
719
+ )
720
+ # Determine monotonic timestamp ordering.
721
+ ind = np .argsort (stream .time_stamps [ts_slice ], kind = "stable" )
722
+ # Reorder timestamps in place.
723
+ stream .time_stamps [ts_slice ] = stream .time_stamps [ts_slice ][ind ]
724
+ # Reorder timeseries data.
725
+ if stream .fmt == "string" :
726
+ stream .time_series [ts_slice ] = np .array (stream .time_series [ts_slice ])[
727
+ ind
728
+ ].tolist ()
729
+ else :
730
+ stream .time_series [ts_slice ] = stream .time_series [ts_slice ][ind ]
731
+ return stream
732
+
733
+
546
734
def _find_segment_indices (b_breaks ):
547
735
"""Convert boundary breaks array to segment indices.
548
736
0 commit comments