Skip to content

Commit 70b32dd

Browse files
feat: add MDARunner in experimental namespace (#197)
* feat: add runner from pymmcore-plus * add dep * fix hint * try fix 3.8 * style(pre-commit.ci): auto fixes [...] * more coverage * change namespace to experimental --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent a5819ef commit 70b32dd

File tree

6 files changed

+667
-2
lines changed

6 files changed

+667
-2
lines changed

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ dependencies = ["pydantic >=2.6", "numpy", "typing-extensions"]
4242
# https://peps.python.org/pep-0621/#dependencies-optional-dependencies
4343
[project.optional-dependencies]
4444
yaml = ["PyYAML"]
45-
test = ["pytest>=6.0", "pytest-cov", "PyYAML"]
45+
test = ["pytest>=6.0", "pytest-cov", "PyYAML", "psygnal"]
4646
dev = [
4747
"ipython",
4848
"mypy",
@@ -103,7 +103,7 @@ ignore = [
103103
keep-runtime-typing = true
104104

105105
[tool.ruff.lint.per-file-ignores]
106-
"tests/*.py" = ["D", "S101", "E501"]
106+
"tests/*.py" = ["D", "S101", "E501", "SLF"]
107107

108108
[tool.ruff.lint.flake8-tidy-imports]
109109
# Disallow all relative imports.

src/useq/experimental/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
"""MDARunner class for running an Iterable[MDAEvent]."""
2+
3+
from useq.experimental._runner import MDARunner
4+
from useq.experimental.protocols import PMDAEngine
5+
6+
__all__ = ["MDARunner", "PMDAEngine"]

src/useq/experimental/_runner.py

Lines changed: 348 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,348 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
import time
5+
import warnings
6+
from contextlib import contextmanager
7+
from typing import TYPE_CHECKING
8+
from unittest.mock import MagicMock
9+
10+
from useq._mda_sequence import MDASequence
11+
from useq.experimental.protocols import PMDAEngine, PMDASignaler
12+
13+
if TYPE_CHECKING:
14+
from collections.abc import Iterable, Iterator
15+
16+
from useq import MDAEvent
17+
18+
19+
MSG = (
20+
"This sequence is a placeholder for a generator of events with unknown "
21+
"length & shape. Iterating over it has no effect."
22+
)
23+
24+
25+
@contextmanager
26+
def _exceptions_logged(logger: logging.Logger) -> Iterator[None]:
27+
"""Context manager to log exceptions."""
28+
try:
29+
yield
30+
except Exception as e:
31+
logger.error(e)
32+
33+
34+
class GeneratorMDASequence(MDASequence):
35+
axis_order: tuple[str, ...] = ()
36+
37+
@property
38+
def sizes(self) -> dict[str, int]: # pragma: no cover
39+
warnings.warn(MSG, stacklevel=2)
40+
return {}
41+
42+
def iter_axis(self, axis: str) -> Iterator: # pragma: no cover
43+
warnings.warn(MSG, stacklevel=2)
44+
yield from []
45+
46+
def __str__(self) -> str: # pragma: no cover
47+
return "GeneratorMDASequence()"
48+
49+
50+
class MDARunner:
51+
"""Object that executes a multi-dimensional experiment using an MDAEngine.
52+
53+
This object is available at [`CMMCorePlus.mda`][pymmcore_plus.CMMCorePlus.mda].
54+
55+
This is the main object that runs a multi-dimensional experiment; it does so by
56+
driving an acquisition engine that implements the
57+
[`PMDAEngine`][pymmcore_plus.mda.PMDAEngine] protocol. It emits signals at specific
58+
times during the experiment (see
59+
[`PMDASignaler`][pymmcore_plus.mda.events.PMDASignaler] for details on the signals
60+
that are available to connect to and when they are emitted).
61+
"""
62+
63+
def __init__(
64+
self, signal_emitter: PMDASignaler, logger: logging.Logger | None = None
65+
) -> None:
66+
self._engine: PMDAEngine | None = None
67+
self._signals = signal_emitter
68+
self._logger = logger or logging.getLogger(__name__)
69+
70+
self._running = False
71+
self._paused = False
72+
self._paused_time: float = 0
73+
self._pause_interval: float = 0.1 # sec to wait between checking pause state
74+
75+
self._canceled = False
76+
self._sequence: MDASequence | None = None
77+
# timer for the full sequence, reset only once at the beginning of the sequence
78+
self._sequence_t0: float = 0.0
79+
# event clock, reset whenever `event.reset_event_timer` is True
80+
self._t0: float = 0.0
81+
82+
def set_engine(self, engine: PMDAEngine) -> PMDAEngine | None:
83+
"""Set the [`PMDAEngine`][pymmcore_plus.mda.PMDAEngine] to use for the MDA run.""" # noqa: E501
84+
# MagicMock on py312 no longer satisfies isinstance ... so we explicitly
85+
# allow it here just for the sake of testing.
86+
if not isinstance(engine, (PMDAEngine, MagicMock)): # pragma: no cover
87+
raise TypeError("Engine does not conform to the Engine protocol.")
88+
89+
if self.is_running(): # pragma: no cover
90+
raise RuntimeError(
91+
"Cannot register a new engine when the current engine is running "
92+
"an acquisition. Please cancel the current engine's acquisition "
93+
"before registering"
94+
)
95+
96+
old_engine, self._engine = self._engine, engine
97+
return old_engine
98+
99+
@property
100+
def engine(self) -> PMDAEngine | None:
101+
"""The [`PMDAEngine`][pymmcore_plus.mda.PMDAEngine] that is currently being used.""" # noqa: E501
102+
return self._engine
103+
104+
@property
105+
def events(self) -> PMDASignaler:
106+
"""Signals that are emitted during the MDA run.
107+
108+
See [`PMDASignaler`][pymmcore_plus.mda.PMDASignaler] for details on the
109+
signals that are available to connect to.
110+
"""
111+
return self._signals
112+
113+
def is_running(self) -> bool:
114+
"""Return True if an acquisition is currently underway.
115+
116+
This will return True at any point between the emission of the
117+
[`sequenceStarted`][pymmcore_plus.mda.PMDASignaler.sequenceStarted] and
118+
[`sequenceFinished`][pymmcore_plus.mda.PMDASignaler.sequenceFinished] signals,
119+
including when the acquisition is currently paused.
120+
121+
Returns
122+
-------
123+
bool
124+
Whether an acquisition is underway.
125+
"""
126+
return self._running
127+
128+
def is_paused(self) -> bool:
129+
"""Return True if the acquisition is currently paused.
130+
131+
Use `toggle_pause` to change the paused state.
132+
133+
Returns
134+
-------
135+
bool
136+
Whether the current acquisition is paused.
137+
"""
138+
return self._paused
139+
140+
def cancel(self) -> None:
141+
"""Cancel the currently running acquisition.
142+
143+
This is a no-op if no acquisition is currently running.
144+
If an acquisition is running then this will cancel the acquisition and
145+
a sequenceCanceled signal, followed by a sequenceFinished signal will
146+
be emitted.
147+
"""
148+
self._canceled = True
149+
self._paused_time = 0
150+
151+
def toggle_pause(self) -> None:
152+
"""Toggle the paused state of the current acquisition.
153+
154+
To get whether the acquisition is currently paused use the
155+
[`is_paused`][pymmcore_plus.mda.MDARunner.is_paused] method. This method is a
156+
no-op if no acquisition is currently underway.
157+
"""
158+
if self.is_running():
159+
self._paused = not self._paused
160+
self._signals.sequencePauseToggled.emit(self._paused)
161+
162+
def run(
163+
self,
164+
events: Iterable[MDAEvent],
165+
) -> None:
166+
"""Run the multi-dimensional acquisition defined by `sequence`.
167+
168+
Most users should not use this directly as it will block further
169+
execution. Instead, use the
170+
[`CMMCorePlus.run_mda`][pymmcore_plus.CMMCorePlus.run_mda] method which will
171+
run on a thread.
172+
173+
Parameters
174+
----------
175+
events : Iterable[MDAEvent]
176+
An iterable of `useq.MDAEvents` objects to execute.
177+
"""
178+
error = None
179+
sequence = events if isinstance(events, MDASequence) else GeneratorMDASequence()
180+
# NOTE: it's important that `_prepare_to_run` and `_finish_run` are
181+
# called inside the context manager, since the `mda_listeners_connected`
182+
# context manager expects to see both of those signals.
183+
try:
184+
engine = self._prepare_to_run(sequence)
185+
self._run(engine, events)
186+
except Exception as e:
187+
error = e
188+
with _exceptions_logged(self._logger):
189+
self._finish_run(sequence)
190+
if error is not None:
191+
raise error
192+
193+
def seconds_elapsed(self) -> float:
194+
"""Return the number of seconds since the start of the acquisition."""
195+
return time.perf_counter() - self._sequence_t0
196+
197+
def event_seconds_elapsed(self) -> float:
198+
"""Return the number of seconds on the "event clock".
199+
200+
This is the time since either the start of the acquisition or the last
201+
event with `reset_event_timer` set to `True`.
202+
"""
203+
return time.perf_counter() - self._t0
204+
205+
def _run(self, engine: PMDAEngine, events: Iterable[MDAEvent]) -> None:
206+
"""Main execution of events, inside the try/except block of `run`."""
207+
teardown_event = getattr(engine, "teardown_event", lambda e: None)
208+
event_iterator = getattr(engine, "event_iterator", iter)
209+
_events: Iterator[MDAEvent] = event_iterator(events)
210+
self._reset_event_timer()
211+
self._sequence_t0 = self._t0
212+
213+
for event in _events:
214+
if event.reset_event_timer:
215+
self._reset_event_timer()
216+
# If cancelled break out of the loop
217+
if self._wait_until_event(event) or not self._running:
218+
break
219+
220+
self._signals.eventStarted.emit(event)
221+
self._logger.info("%s", event)
222+
engine.setup_event(event)
223+
224+
try:
225+
runner_time_ms = self.seconds_elapsed() * 1000
226+
# this is a bit of a hack to pass the time into the engine
227+
# it is used for intra-event time calculations inside the engine.
228+
# we pop it off after the event is executed.
229+
event.metadata["runner_t0"] = self._sequence_t0
230+
output = engine.exec_event(event) or () # in case output is None
231+
for payload in output:
232+
img, event, meta = payload
233+
event.metadata.pop("runner_t0", None)
234+
# if the engine calculated its own time, don't overwrite it
235+
if "runner_time_ms" not in meta:
236+
meta["runner_time_ms"] = runner_time_ms
237+
with _exceptions_logged(self._logger):
238+
self._signals.frameReady.emit(img, event, meta)
239+
finally:
240+
teardown_event(event)
241+
242+
def _prepare_to_run(self, sequence: MDASequence) -> PMDAEngine:
243+
"""Set up for the MDA run.
244+
245+
Parameters
246+
----------
247+
sequence : MDASequence
248+
The sequence of events to run.
249+
"""
250+
if not self._engine: # pragma: no cover
251+
raise RuntimeError("No MDAEngine set.")
252+
253+
self._running = True
254+
self._paused = False
255+
self._paused_time = 0.0
256+
self._sequence = sequence
257+
258+
meta = self._engine.setup_sequence(sequence)
259+
self._signals.sequenceStarted.emit(sequence, meta or {})
260+
self._logger.info("MDA Started: %s", sequence)
261+
return self._engine
262+
263+
def _reset_event_timer(self) -> None:
264+
self._t0 = time.perf_counter() # reference time, in seconds
265+
266+
def _check_canceled(self) -> bool:
267+
"""Return True if the cancel method has been called and emit relevant signals.
268+
269+
If cancelled, this relies on the `self._sequence` being the current sequence
270+
in order to emit a `sequenceCanceled` signal.
271+
272+
Returns
273+
-------
274+
bool
275+
Whether the MDA has been canceled.
276+
"""
277+
if self._canceled:
278+
self._logger.warning("MDA Canceled: %s", self._sequence)
279+
self._signals.sequenceCanceled.emit(self._sequence)
280+
self._canceled = False
281+
return True
282+
return False
283+
284+
def _wait_until_event(self, event: MDAEvent) -> bool:
285+
"""Wait until the event's min start time, checking for pauses cancellations.
286+
287+
Parameters
288+
----------
289+
event : MDAEvent
290+
The event to wait for.
291+
292+
Returns
293+
-------
294+
bool
295+
Whether the MDA was cancelled while waiting.
296+
"""
297+
if not self.is_running():
298+
return False # pragma: no cover
299+
if self._check_canceled():
300+
return True
301+
while self.is_paused() and not self._canceled:
302+
self._paused_time += self._pause_interval # fixme: be more precise
303+
time.sleep(self._pause_interval)
304+
305+
if self._check_canceled():
306+
return True
307+
308+
# FIXME: this is actually the only place where the runner assumes our event is
309+
# an MDAevent. For everything else, the engine is technically the only thing
310+
# that cares about the event time.
311+
# So this whole method could potentially be moved to the engine.
312+
if event.min_start_time:
313+
go_at = event.min_start_time + self._paused_time
314+
# We need to enter a loop here checking paused and canceled.
315+
# otherwise you'll potentially wait a long time to cancel
316+
remaining_wait_time = go_at - self.event_seconds_elapsed()
317+
while remaining_wait_time > 0:
318+
self._signals.awaitingEvent.emit(event, remaining_wait_time)
319+
while self._paused and not self._canceled:
320+
self._paused_time += self._pause_interval # fixme: be more precise
321+
remaining_wait_time += self._pause_interval
322+
time.sleep(self._pause_interval)
323+
324+
if self._canceled:
325+
break
326+
time.sleep(min(remaining_wait_time, 0.5))
327+
remaining_wait_time = go_at - self.event_seconds_elapsed()
328+
329+
# check canceled again in case it was canceled
330+
# during the waiting loop
331+
return self._check_canceled()
332+
333+
def _finish_run(self, sequence: MDASequence) -> None:
334+
"""To be called at the end of an acquisition.
335+
336+
Parameters
337+
----------
338+
sequence : MDASequence
339+
The sequence that was finished.
340+
"""
341+
self._running = False
342+
self._canceled = False
343+
344+
if hasattr(self._engine, "teardown_sequence"):
345+
self._engine.teardown_sequence(sequence) # type: ignore
346+
347+
self._logger.info("MDA Finished: %s", sequence)
348+
self._signals.sequenceFinished.emit(sequence)

0 commit comments

Comments
 (0)