Skip to content

Optionally handle non monotonic time-series #132

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
### Added
- Add new `case_sensitive` parameter to `match_streaminfos`, defaulting to `False` to maintain previous behavior; when `False`, stream properties are matched more leniently ([#134](https://github.com/xdf-modules/pyxdf/pull/134) by [Stefan Appelhoff](https://github.com/sappelhoff))
- Expose detected clock segments (used in synchronisation) as `stream["info"]["clock_segments"]` ([#131](https://github.com/xdf-modules/pyxdf/pull/131) by [Jamie Forth](https://github.com/jamieforth))
- Add new `handle_non_monotonic` parameter to `load_xdf`, defaulting to `"warn"` to maintain previous behavior with additional warning when non-monotonic data are detected; when `None` check is bypassed entirely. Non-monotonic data are handled by sorting time-stamps (`"trust_timeseries"`) or by re-ordering samples according to sorted time-stamps (`"trust-timestamps"`) ([#132](https://github.com/xdf-modules/pyxdf/pull/132) by [Jamie Forth](https://github.com/jamieforth))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Add new `handle_non_monotonic` parameter to `load_xdf`, defaulting to `"warn"` to maintain previous behavior with additional warning when non-monotonic data are detected; when `None` check is bypassed entirely. Non-monotonic data are handled by sorting time-stamps (`"trust_timeseries"`) or by re-ordering samples according to sorted time-stamps (`"trust-timestamps"`) ([#132](https://github.com/xdf-modules/pyxdf/pull/132) by [Jamie Forth](https://github.com/jamieforth))
- Add new `handle_non_monotonic` parameter to `load_xdf`, defaulting to `"warn"` to maintain previous behavior but with an additional warning when non-monotonic data are detected; if `None`, the check is bypassed entirely. Non-monotonic data are handled by sorting time-stamps (`"trust_timeseries"`) or by re-ordering samples according to sorted time-stamps (`"trust_timestamps"`) ([#132](https://github.com/xdf-modules/pyxdf/pull/132) by [Jamie Forth](https://github.com/jamieforth))


### Changed
- Segment at negative time intervals when dejittering ([#130](https://github.com/xdf-modules/pyxdf/pull/130) by [Jamie Forth](https://github.com/jamieforth))
Expand Down
215 changes: 214 additions & 1 deletion src/pyxdf/pyxdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
import itertools
import logging
import struct
from collections import OrderedDict, defaultdict
from collections import OrderedDict, defaultdict, namedtuple
from enum import Enum
from pathlib import Path
from xml.etree.ElementTree import ParseError, fromstring

Expand All @@ -26,6 +27,13 @@
logger = logging.getLogger(__name__)


class HandleNonMonoState(Enum):
DISABLE = None
WARN = "warn"
TRUST_TIMESERIES = "trust_timeseries"
TRUST_TIMESTAMPS = "trust_timestamps"


class StreamData:
"""Temporary per-stream data."""

Expand Down Expand Up @@ -77,6 +85,7 @@ def load_xdf(
select_streams=None,
*,
on_chunk=None,
handle_non_monotonic="warn",
synchronize_clocks=True,
handle_clock_resets=True,
dejitter_timestamps=True,
Expand Down Expand Up @@ -120,6 +129,15 @@ def load_xdf(
verbose : Passing True will set logging level to DEBUG, False will set it to
WARNING, and None will use root logger level. (default: None)

handle_non_monotonic : str | None
Whether to warn only or ensure data is in ascending time order.
- "warn": check only, warning when data are non-monotonic (default)
- "trust_timeseries": check and sort non-monotonic time-stamps only,
preserving sample sequence order.
- "trust_timestamps": check and sort non-monotonic time-stamps and samples,
preserving time-stamp and sample alignment.
- None: Disable non-monotonicity check.

synchronize_clocks : Whether to enable clock synchronization based on
ClockOffset chunks. (default: true)

Expand Down Expand Up @@ -202,6 +220,9 @@ def load_xdf(

logger.info("Importing XDF file %s..." % filename)

# Validate handle_non_monotonic argument.
handle_non_monotonic = HandleNonMonoState(handle_non_monotonic)
Comment on lines +223 to +224
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this validates the input, the error message you get for an unsupported value is not very helpful (ValueError: 'whatever' is not a valid HandleNonMonoState). I would handle (no pun intended) this with a custom more specific error message, which probably makes the use of an Enum unnecessary.


# if select_streams is an int or a list of int, load only streams associated with
# the corresponding stream IDs
# if select_streams is a list of dicts, use this to query and load streams
Expand Down Expand Up @@ -356,6 +377,34 @@ def load_xdf(
else:
stream.time_series = np.zeros((0, stream.nchns))

# Perform initial non-monotonicity checks if requested
if handle_non_monotonic is HandleNonMonoState.DISABLE:
logger.info(" skipping non-monotonicity check...")
else:
logger.info(" performing non-monotonicity check...")
mono_status = _check_monotonicity(temp)
# Are all time-values monotonic across all streams?
time_stamps_mono = all(mono_status["time_stamps"].values())
clock_times_mono = all(mono_status["clock_times"].values())
if time_stamps_mono and clock_times_mono:
# All streams are monotonic.
logger.info("All streams are monotonic, continuing...")
elif handle_non_monotonic is HandleNonMonoState.WARN:
# Some data are non-monotonic, but we will not attempt to handle it.
if not synchronize_clocks and not clock_times_mono:
msg = (
"Clock offsets are non-monotonic but clocks are not going to "
"be synchronized. Consider loading with 'synchronize_clocks=True'."
)
logger.warning(msg)
else:
msg = (
"Non-monotonic streams detected - "
"consider loading with handle_non_monotonic='trust_timeseries' "
"or 'trust_timestamps'"
)
logger.warning(msg)

# perform (fault-tolerant) clock synchronization if requested
if synchronize_clocks:
logger.info(" performing clock synchronization...")
Expand All @@ -369,6 +418,19 @@ def load_xdf(
winsor_threshold,
)

# perform non-monotonicity handling if requested
if (
handle_non_monotonic is HandleNonMonoState.TRUST_TIMESERIES
or handle_non_monotonic is HandleNonMonoState.TRUST_TIMESTAMPS
):
logger.info(f" sorting non-monotonic data: {handle_non_monotonic.name}...")
for stream_id, stream in temp.items():
_sort_stream_data(
stream_id,
stream,
handle_non_monotonic is HandleNonMonoState.TRUST_TIMESTAMPS,
)

# perform jitter removal if requested
if dejitter_timestamps:
logger.info(" performing jitter removal...")
Expand Down Expand Up @@ -542,6 +604,157 @@ def _scan_forward(f):
return False


# Named tuple to represent monotonicity information.
Monotonicity = namedtuple("Monotonicity", ["result", "n", "dec_count", "eq_count"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be public? Also, and especially if this is just private, does this need to be a NamedTuple?

# - result: bool - True if all pairwise values are equal or increasing.
# - n: total number of pairwise values
# - dec_count: number of pairwise decreasing values
# - eq_count: number of pairwise equal values


def _monotonic_increasing(a):
"""Test for increasing (non-decreasing) monotonicity.

Parameters
----------
a : array_like

Returns
-------
Monotonicity : namedtuple
"""
# Intervals between successive values.
diffs = np.diff(a)

increasing = True
n = len(diffs)
dec_count = 0
eq_count = 0

# Count non-increasing intervals.
non_inc_count = np.sum(diffs <= 0)

if non_inc_count > 0:
# Count constant intervals.
eq_count = np.sum(diffs == 0).item()
dec_count = (non_inc_count - eq_count).item()

if dec_count > 0:
increasing = False
return Monotonicity(increasing, n, dec_count, eq_count)


def _check_monotonicity(streams):
"""Check monotonicity of all stream time values.

Parameters
----------
streams : dict
Dictionary of stream_id: StreamData.

Returns
-------
mono_status : dict[dict]
Dictionary of attr: [stream_id: bool]
- attr: StreamData attribute name
- stream_id: Stream ID
- bool: stream attr data are monotonic
"""
mono_status = {
"time_stamps": {},
"clock_times": {},
}

max_stream_id = max(streams.keys())
id_align = len(str(max_stream_id))

for stream_id, stream in streams.items():
for attr in mono_status.keys():
monotonic, n, dec_count, eq_count = _monotonic_increasing(
getattr(stream, attr)
)

mono_status[attr][stream_id] = monotonic

msg = [
(
f"Stream {stream_id:>{id_align}}: {attr} are"
f"{'' if monotonic else ' NOT'} monotonic"
)
]

if dec_count > 0:
dec_pc = round(dec_count / n * 100, 1)
msg.append(f"dec={dec_count} ({'<0.1' if dec_pc < 0.1 else dec_pc}%)")

if eq_count > 0:
eq_pc = round(eq_count / n * 100, 1)
msg.append(f"eq={eq_count} ({'<0.1' if eq_pc < 0.1 else eq_pc}%)")

msg = ", ".join(msg)

if monotonic:
logger.info(msg)
else:
logger.warning(msg)
return mono_status


def _sort_stream_data(stream_id, stream, reorder_timeseries=False):
"""Sort stream timestamps and optionally reorder timeseries data.

Parameters
----------
stream_id : int
stream : StreamData (possibly non-monotonic)
reorder_timeseries : bool (default: False)

Returns
-------
stream : StreamData (monotonic)
With sorted timestamps and optionally reordered timeseries data.

Stable sorts non-monotonic ground-truth timestamps in ascending order. When
clock resets have been detected sorting is applied within each clock
segment, but clock segments themselves are not sorted.

When `reorder_timeseries=False` only the timestamp array is modified.

When `reorder_timeseries=True` both timestamp and timeseries arrays are
modified, keeping all samples aligned with their original timestamp.

Monotonic streams/segments are not modified.

Stream may still contain identically timestamped samples, but these can be
handled by dejittering.
"""
if len(stream.time_stamps) <= 1:
return stream
clock_segments = stream.clock_segments
if len(clock_segments) == 0:
# Clocks have not been synchronized.
clock_segments = [(0, len(stream.time_stamps) - 1)] # inclusive
for start_i, stop_i in clock_segments:
ts_slice = slice(start_i, stop_i + 1)
if not _monotonic_increasing(stream.time_stamps[ts_slice]).result:
logger.info(
f"Sorting stream {stream_id}: clock segment {start_i}-{stop_i}."
)
# Determine monotonic timestamp ordering.
ind = np.argsort(stream.time_stamps[ts_slice], kind="stable")
# Reorder timestamps in place.
stream.time_stamps[ts_slice] = stream.time_stamps[ts_slice][ind]
if reorder_timeseries:
# Reorder timeseries data to align with timestamps.
if stream.fmt == "string":
stream.time_series[ts_slice] = np.array(
stream.time_series[ts_slice]
)[ind].tolist()
else:
stream.time_series[ts_slice] = stream.time_series[ts_slice][ind]
return stream


def _find_segment_indices(b_breaks):
"""Convert boundary breaks array to segment indices.

Expand Down
Loading