-
Notifications
You must be signed in to change notification settings - Fork 19
Optionally handle non monotonic time-series #132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,8 @@ | |
import itertools | ||
import logging | ||
import struct | ||
from collections import OrderedDict, defaultdict | ||
from collections import OrderedDict, defaultdict, namedtuple | ||
from enum import Enum | ||
from pathlib import Path | ||
from xml.etree.ElementTree import ParseError, fromstring | ||
|
||
|
@@ -26,6 +27,13 @@ | |
logger = logging.getLogger(__name__) | ||
|
||
|
||
class HandleNonMonoState(Enum): | ||
DISABLE = None | ||
WARN = "warn" | ||
TRUST_TIMESERIES = "trust_timeseries" | ||
TRUST_TIMESTAMPS = "trust_timestamps" | ||
|
||
|
||
class StreamData: | ||
"""Temporary per-stream data.""" | ||
|
||
|
@@ -77,6 +85,7 @@ def load_xdf( | |
select_streams=None, | ||
*, | ||
on_chunk=None, | ||
handle_non_monotonic="warn", | ||
synchronize_clocks=True, | ||
handle_clock_resets=True, | ||
dejitter_timestamps=True, | ||
|
@@ -120,6 +129,15 @@ def load_xdf( | |
verbose : Passing True will set logging level to DEBUG, False will set it to | ||
WARNING, and None will use root logger level. (default: None) | ||
|
||
handle_non_monotonic : str | None | ||
Whether to warn only or ensure data is in ascending time order. | ||
- "warn": check only, warning when data are non-monotonic (default) | ||
- "trust_timeseries": check and sort non-monotonic time-stamps only, | ||
preserving sample sequence order. | ||
- "trust_timestamps": check and sort non-monotonic time-stamps and samples, | ||
preserving time-stamp and sample alignment. | ||
- None: Disable non-monotonicity check. | ||
|
||
synchronize_clocks : Whether to enable clock synchronization based on | ||
ClockOffset chunks. (default: true) | ||
|
||
|
@@ -202,6 +220,9 @@ def load_xdf( | |
|
||
logger.info("Importing XDF file %s..." % filename) | ||
|
||
# Validate handle_non_monotonic argument. | ||
handle_non_monotonic = HandleNonMonoState(handle_non_monotonic) | ||
Comment on lines
+223
to
+224
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Although this validates the input, the error message you get for an unsupported value is not very helpful ( |
||
|
||
# if select_streams is an int or a list of int, load only streams associated with | ||
# the corresponding stream IDs | ||
# if select_streams is a list of dicts, use this to query and load streams | ||
|
@@ -356,6 +377,34 @@ def load_xdf( | |
else: | ||
stream.time_series = np.zeros((0, stream.nchns)) | ||
|
||
# Perform initial non-monotonicity checks if requested | ||
if handle_non_monotonic is HandleNonMonoState.DISABLE: | ||
logger.info(" skipping non-monotonicity check...") | ||
else: | ||
logger.info(" performing non-monotonicity check...") | ||
mono_status = _check_monotonicity(temp) | ||
# Are all time-values monotonic across all streams? | ||
time_stamps_mono = all(mono_status["time_stamps"].values()) | ||
clock_times_mono = all(mono_status["clock_times"].values()) | ||
if time_stamps_mono and clock_times_mono: | ||
# All streams are monotonic. | ||
logger.info("All streams are monotonic, continuing...") | ||
elif handle_non_monotonic is HandleNonMonoState.WARN: | ||
# Some data are non-monotonic, but we will not attempt to handle it. | ||
if not synchronize_clocks and not clock_times_mono: | ||
msg = ( | ||
"Clock offsets are non-monotonic but clocks are not going to " | ||
"be synchronized. Consider loading with 'synchronize_clocks=True'." | ||
) | ||
logger.warning(msg) | ||
else: | ||
msg = ( | ||
"Non-monotonic streams detected - " | ||
"consider loading with handle_non_monotonic='trust_timeseries' " | ||
"or 'trust_timestamps'" | ||
) | ||
logger.warning(msg) | ||
|
||
# perform (fault-tolerant) clock synchronization if requested | ||
if synchronize_clocks: | ||
logger.info(" performing clock synchronization...") | ||
|
@@ -369,6 +418,19 @@ def load_xdf( | |
winsor_threshold, | ||
) | ||
|
||
# perform non-monotonicity handling if requested | ||
if ( | ||
handle_non_monotonic is HandleNonMonoState.TRUST_TIMESERIES | ||
or handle_non_monotonic is HandleNonMonoState.TRUST_TIMESTAMPS | ||
): | ||
logger.info(f" sorting non-monotonic data: {handle_non_monotonic.name}...") | ||
for stream_id, stream in temp.items(): | ||
_sort_stream_data( | ||
stream_id, | ||
stream, | ||
handle_non_monotonic is HandleNonMonoState.TRUST_TIMESTAMPS, | ||
) | ||
|
||
# perform jitter removal if requested | ||
if dejitter_timestamps: | ||
logger.info(" performing jitter removal...") | ||
|
@@ -542,6 +604,157 @@ def _scan_forward(f): | |
return False | ||
|
||
|
||
# Named tuple to represent monotonicity information. | ||
Monotonicity = namedtuple("Monotonicity", ["result", "n", "dec_count", "eq_count"]) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this need to be public? Also, and especially if this is just private, does this need to be a |
||
# - result: bool - True if all pairwise values are equal or increasing. | ||
# - n: total number of pairwise values | ||
# - dec_count: number of pairwise decreasing values | ||
# - eq_count: number of pairwise equal values | ||
|
||
|
||
def _monotonic_increasing(a): | ||
"""Test for increasing (non-decreasing) monotonicity. | ||
|
||
Parameters | ||
---------- | ||
a : array_like | ||
|
||
Returns | ||
------- | ||
Monotonicity : namedtuple | ||
""" | ||
# Intervals between successive values. | ||
diffs = np.diff(a) | ||
|
||
increasing = True | ||
n = len(diffs) | ||
dec_count = 0 | ||
eq_count = 0 | ||
|
||
# Count non-increasing intervals. | ||
non_inc_count = np.sum(diffs <= 0) | ||
|
||
if non_inc_count > 0: | ||
# Count constant intervals. | ||
eq_count = np.sum(diffs == 0).item() | ||
dec_count = (non_inc_count - eq_count).item() | ||
|
||
if dec_count > 0: | ||
increasing = False | ||
return Monotonicity(increasing, n, dec_count, eq_count) | ||
|
||
|
||
def _check_monotonicity(streams): | ||
"""Check monotonicity of all stream time values. | ||
|
||
Parameters | ||
---------- | ||
streams : dict | ||
Dictionary of stream_id: StreamData. | ||
|
||
Returns | ||
------- | ||
mono_status : dict[dict] | ||
Dictionary of attr: [stream_id: bool] | ||
- attr: StreamData attribute name | ||
- stream_id: Stream ID | ||
- bool: stream attr data are monotonic | ||
""" | ||
mono_status = { | ||
"time_stamps": {}, | ||
"clock_times": {}, | ||
} | ||
|
||
max_stream_id = max(streams.keys()) | ||
id_align = len(str(max_stream_id)) | ||
|
||
for stream_id, stream in streams.items(): | ||
for attr in mono_status.keys(): | ||
monotonic, n, dec_count, eq_count = _monotonic_increasing( | ||
getattr(stream, attr) | ||
) | ||
|
||
mono_status[attr][stream_id] = monotonic | ||
|
||
msg = [ | ||
( | ||
f"Stream {stream_id:>{id_align}}: {attr} are" | ||
f"{'' if monotonic else ' NOT'} monotonic" | ||
) | ||
] | ||
|
||
if dec_count > 0: | ||
dec_pc = round(dec_count / n * 100, 1) | ||
msg.append(f"dec={dec_count} ({'<0.1' if dec_pc < 0.1 else dec_pc}%)") | ||
|
||
if eq_count > 0: | ||
eq_pc = round(eq_count / n * 100, 1) | ||
msg.append(f"eq={eq_count} ({'<0.1' if eq_pc < 0.1 else eq_pc}%)") | ||
|
||
msg = ", ".join(msg) | ||
|
||
if monotonic: | ||
logger.info(msg) | ||
else: | ||
logger.warning(msg) | ||
return mono_status | ||
|
||
|
||
def _sort_stream_data(stream_id, stream, reorder_timeseries=False): | ||
"""Sort stream timestamps and optionally reorder timeseries data. | ||
|
||
Parameters | ||
---------- | ||
stream_id : int | ||
stream : StreamData (possibly non-monotonic) | ||
reorder_timeseries : bool (default: False) | ||
|
||
Returns | ||
------- | ||
stream : StreamData (monotonic) | ||
With sorted timestamps and optionally reordered timeseries data. | ||
|
||
Stable sorts non-monotonic ground-truth timestamps in ascending order. When | ||
clock resets have been detected sorting is applied within each clock | ||
segment, but clock segments themselves are not sorted. | ||
|
||
When `reorder_timeseries=False` only the timestamp array is modified. | ||
|
||
When `reorder_timeseries=True` both timestamp and timeseries arrays are | ||
modified, keeping all samples aligned with their original timestamp. | ||
|
||
Monotonic streams/segments are not modified. | ||
|
||
Stream may still contain identically timestamped samples, but these can be | ||
handled by dejittering. | ||
""" | ||
if len(stream.time_stamps) <= 1: | ||
return stream | ||
clock_segments = stream.clock_segments | ||
if len(clock_segments) == 0: | ||
# Clocks have not been synchronized. | ||
clock_segments = [(0, len(stream.time_stamps) - 1)] # inclusive | ||
for start_i, stop_i in clock_segments: | ||
ts_slice = slice(start_i, stop_i + 1) | ||
if not _monotonic_increasing(stream.time_stamps[ts_slice]).result: | ||
logger.info( | ||
f"Sorting stream {stream_id}: clock segment {start_i}-{stop_i}." | ||
) | ||
# Determine monotonic timestamp ordering. | ||
ind = np.argsort(stream.time_stamps[ts_slice], kind="stable") | ||
# Reorder timestamps in place. | ||
stream.time_stamps[ts_slice] = stream.time_stamps[ts_slice][ind] | ||
if reorder_timeseries: | ||
# Reorder timeseries data to align with timestamps. | ||
if stream.fmt == "string": | ||
stream.time_series[ts_slice] = np.array( | ||
stream.time_series[ts_slice] | ||
)[ind].tolist() | ||
else: | ||
stream.time_series[ts_slice] = stream.time_series[ts_slice][ind] | ||
return stream | ||
|
||
|
||
def _find_segment_indices(b_breaks): | ||
"""Convert boundary breaks array to segment indices. | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.