Skip to content

Commit 5610605

Browse files
teonbrookslarsoner
authored andcommitted
[MRG] LSLClient (#6120)
* working on the realtime client * new base class for rtclient * updates to the base client * add import * first pass at lsl in mne * UPDATE lsl client, base_client, FIX epochs * decouple the refactor from the LSLClient implementation this reverts the attempt to both add an LSLClient and create a BaseClient class. For this upcoming PR, the focus will be adding support to LSL. subsequent PRs can be done to refactor the realtime module * update to lsl client at the moment, I am able to connect to a stream. I need to test whether I am able to read the data from the stream. opened an issue on pylsl to inquire whether i can both send and receive on same machine for local dev purpose (labstreaminglayer/pylsl#10) once I can confirm the data is coming in properly, i will add some tests * Update lsl_client.py mistakenly tried to create inlets twice. this is the mvp for the LSL * code cleanup and the beginning of tests for the lslclient * Update test_lsl_client.py * starting to remember the pythonic way of coding been living in the world of javascript. feeling like a wayward youth * simplify time * Update lsl_client.py * address some comment in the pr change the pattern of checking for the import following common pattern seen in mne * [FIX] _testing.py: copy pasted but never corrected * Update test_lsl_client.py forgot that I changed the class signature. If this doesn't work, I will implement multiprocessing to create the LSL Stream for the test * testing is very useful just caught the fact that I didn't change the base class to take the private connect function * FIX arg in _connect; Implement thread to test Fix a typo in the LSLClient._connect which led to downstream errors. * address styling * replace thread with process for clean exit * linting complete * FIX: Add to two other req lists * FIX: Not on PyPi for Linux
1 parent d3747b5 commit 5610605

13 files changed

+414
-7
lines changed

README.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ For full functionality, some functions require:
9393
- Picard >= 0.3
9494
- CuPy >= 4.0 (for NVIDIA CUDA acceleration)
9595
- DIPY >= 0.10.1
96+
- PyLSL >= 1.13.1
9697

9798
Contributing to MNE-Python
9899
^^^^^^^^^^^^^^^^^^^^^^^^^^

doc/python_reference.rst

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -948,10 +948,11 @@ Realtime
948948
.. autosummary::
949949
:toctree: generated/
950950

951+
FieldTripClient
952+
LSLClient
953+
MockRtClient
951954
RtEpochs
952955
RtClient
953-
MockRtClient
954-
FieldTripClient
955956
StimServer
956957
StimClient
957958

doc/whats_new.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ Changelog
9595

9696
- Improved clicking in :meth:`mne.io.Raw.plot` (left click on trace toggles bad, left click on background sets green line, right click anywhere removes green line) by `Clemens Brunner`_
9797

98+
- Add :class:`mne.realtime.LSLClient` for realtime data acquisition with LSL streams of data by `Teon Brooks`_ and `Mainak Jas`_
99+
98100
Bug
99101
~~~
100102
- Fix filtering functions (e.g., :meth:`mne.io.Raw.filter`) to properly take into account the two elements in ``n_pad`` parameter by `Bruno Nicenboim`_

environment.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,4 @@ dependencies:
4545
- pydocstyle
4646
- codespell
4747
- python-picard
48+
- pylsl>=1.13.1

mne/realtime/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from .client import RtClient
1111
from .epochs import RtEpochs
12+
from .lsl_client import LSLClient
1213
from .mockclient import MockRtClient
1314
from .fieldtrip_client import FieldTripClient
1415
from .stim_server_client import StimServer, StimClient

mne/realtime/base_client.py

Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
# Authors: Teon Brooks <teon.brooks@gmail.com>
2+
# Mainak Jas <mainakjas@gmail.com>
3+
#
4+
# License: BSD (3-clause)
5+
6+
import threading
7+
import time
8+
import numpy as np
9+
10+
from ..utils import logger, fill_doc
11+
12+
13+
def _buffer_recv_worker(client):
14+
"""Worker thread that constantly receives buffers."""
15+
try:
16+
for raw_buffer in client.iter_raw_buffers():
17+
client._push_raw_buffer(raw_buffer)
18+
except RuntimeError as err:
19+
# something is wrong, the server stopped (or something)
20+
client._recv_thread = None
21+
print('Buffer receive thread stopped: %s' % err)
22+
23+
24+
class _BaseClient(object):
25+
"""Base Realtime Client.
26+
27+
Parameters
28+
----------
29+
info : instance of mne.Info | None
30+
The measurement info read in from a file. If None, it is generated from
31+
the realtime stream. This method may result in less info than expected.
32+
host : str
33+
The identifier of the server. IP address, LSL id, or raw filename.
34+
port : int | None
35+
Port to use for the connection.
36+
wait_max : float
37+
Maximum time (in seconds) to wait for real-time buffer to start.
38+
tmin : float | None
39+
Time instant to start receiving buffers. If None, start from the latest
40+
samples available.
41+
tmax : float
42+
Time instant to stop receiving buffers.
43+
buffer_size : int
44+
Size of each buffer in terms of number of samples.
45+
verbose : bool, str, int, or None
46+
If not None, override default verbose level (see :func:`mne.verbose`
47+
and :ref:`Logging documentation <tut_logging>` for more).
48+
"""
49+
50+
def __init__(self, info=None, host='localhost', port=None,
51+
wait_max=10., tmin=None, tmax=np.inf,
52+
buffer_size=1000, verbose=None): # noqa: D102
53+
self.info = info
54+
self.host = host
55+
self.port = port
56+
self.wait_max = wait_max
57+
self.tmin = tmin
58+
self.tmax = tmax
59+
self.buffer_size = buffer_size
60+
self.verbose = verbose
61+
62+
def __enter__(self): # noqa: D105
63+
64+
# connect to buffer
65+
logger.info("Client: Waiting for server to start")
66+
start_time = time.time()
67+
while time.time() < (start_time + self.wait_max):
68+
try:
69+
self._connect()
70+
logger.info("Client: Connected")
71+
break
72+
except Exception:
73+
time.sleep(0.1)
74+
else:
75+
raise RuntimeError('Could not connect to Client.')
76+
77+
if not self.info:
78+
self.info = self._create_info()
79+
self._enter_extra()
80+
81+
return self
82+
83+
def __exit__(self, type, value, traceback):
84+
self._disconnect()
85+
86+
return self
87+
88+
@fill_doc
89+
def get_data_as_epoch(self, n_samples=1024, picks=None):
90+
"""Return last n_samples from current time.
91+
92+
Parameters
93+
----------
94+
n_samples : int
95+
Number of samples to fetch.
96+
%(picks_all)s
97+
98+
Returns
99+
-------
100+
epoch : instance of Epochs
101+
The samples fetched as an Epochs object.
102+
103+
See Also
104+
--------
105+
mne.Epochs.iter_evoked
106+
"""
107+
pass
108+
109+
def get_measurement_info(self):
110+
"""Return the measurement info.
111+
112+
Returns
113+
-------
114+
self.info : dict
115+
The measurement info.
116+
"""
117+
return self.info
118+
119+
def iter_raw_buffers(self):
120+
"""Return an iterator over raw buffers."""
121+
pass
122+
123+
def register_receive_callback(self, callback):
124+
"""Register a raw buffer receive callback.
125+
126+
Parameters
127+
----------
128+
callback : callable
129+
The callback. The raw buffer is passed as the first parameter
130+
to callback.
131+
"""
132+
if callback not in self._recv_callbacks:
133+
self._recv_callbacks.append(callback)
134+
135+
def start_receive_thread(self, nchan):
136+
"""Start the receive thread.
137+
138+
If the measurement has not been started, it will also be started.
139+
140+
Parameters
141+
----------
142+
nchan : int
143+
The number of channels in the data.
144+
"""
145+
if self._recv_thread is None:
146+
147+
self._recv_thread = threading.Thread(target=_buffer_recv_worker,
148+
args=(self, ))
149+
self._recv_thread.daemon = True
150+
self._recv_thread.start()
151+
152+
def stop_receive_thread(self, stop_measurement=False):
153+
"""Stop the receive thread.
154+
155+
Parameters
156+
----------
157+
stop_measurement : bool
158+
Also stop the measurement.
159+
"""
160+
if self._recv_thread is not None:
161+
self._recv_thread.stop()
162+
self._recv_thread = None
163+
164+
def unregister_receive_callback(self, callback):
165+
"""Unregister a raw buffer receive callback.
166+
167+
Parameters
168+
----------
169+
callback : callable
170+
The callback to unregister.
171+
"""
172+
if callback in self._recv_callbacks:
173+
self._recv_callbacks.remove(callback)
174+
175+
def _connect(self):
176+
"""Connect to client device."""
177+
pass
178+
179+
def _create_info(self):
180+
"""Create an mne.Info class for connection to client."""
181+
pass
182+
183+
def _disconnect(self):
184+
"""Disconnect the client device."""
185+
pass
186+
187+
def _enter_extra(self):
188+
"""Run additional commands in __enter__.
189+
190+
For system-specific loading and initializing after connect but
191+
during the enter.
192+
193+
"""
194+
pass
195+
196+
def _push_raw_buffer(self, raw_buffer):
197+
"""Push raw buffer to clients using callbacks."""
198+
for callback in self._recv_callbacks:
199+
callback(raw_buffer)

mne/realtime/fieldtrip_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ def __enter__(self): # noqa: D105
107107
else:
108108
logger.info("FieldTripClient: Header retrieved")
109109

110-
self.info = self._guess_measurement_info()
110+
self.info = self._create_info()
111111
self.ch_names = self.ft_header.labels
112112

113113
# find start and end samples
@@ -129,7 +129,7 @@ def __enter__(self): # noqa: D105
129129
def __exit__(self, type, value, traceback): # noqa: D105
130130
self.ft_client.disconnect()
131131

132-
def _guess_measurement_info(self):
132+
def _create_info(self):
133133
"""Create a minimal Info dictionary for epoching, averaging, etc."""
134134
if self.info is None:
135135
warn('Info dictionary not provided. Trying to guess it from '

mne/realtime/lsl_client.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
# Authors: Teon Brooks <teon.brooks@gmail.com>
2+
# Mainak Jas <mainakjas@gmail.com>
3+
#
4+
# License: BSD (3-clause)
5+
6+
import numpy as np
7+
8+
from .base_client import _BaseClient
9+
from ..epochs import EpochsArray
10+
from ..io.meas_info import create_info
11+
from ..io.pick import _picks_to_idx, pick_info
12+
from ..utils import fill_doc, _check_pylsl_installed
13+
14+
15+
class LSLClient(_BaseClient):
16+
"""LSL Realtime Client.
17+
18+
Parameters
19+
----------
20+
info : instance of mne.Info | None
21+
The measurement info read in from a file. If None, it is generated from
22+
the LSL stream. This method may result in less info than expected. If
23+
the channel type is EEG, the `standard_1005` montage is used for
24+
electrode location.
25+
host : str
26+
The LSL identifier of the server.
27+
port : int | None
28+
Port to use for the connection.
29+
wait_max : float
30+
Maximum time (in seconds) to wait for real-time buffer to start
31+
tmin : float | None
32+
Time instant to start receiving buffers. If None, start from the latest
33+
samples available.
34+
tmax : float
35+
Time instant to stop receiving buffers.
36+
buffer_size : int
37+
Size of each buffer in terms of number of samples.
38+
verbose : bool, str, int, or None
39+
If not None, override default verbose level (see :func:`mne.verbose`
40+
and :ref:`Logging documentation <tut_logging>` for more).
41+
"""
42+
43+
@fill_doc
44+
def get_data_as_epoch(self, n_samples=1024, picks=None):
45+
"""Return last n_samples from current time.
46+
47+
Parameters
48+
----------
49+
n_samples : int
50+
Number of samples to fetch.
51+
%(picks_all)s
52+
53+
Returns
54+
-------
55+
epoch : instance of Epochs
56+
The samples fetched as an Epochs object.
57+
58+
See Also
59+
--------
60+
mne.Epochs.iter_evoked
61+
"""
62+
# set up timeout in case LSL process hang. wait arb 5x expected time
63+
wait_time = n_samples * 5. / self.info['sfreq']
64+
65+
# create an event at the start of the data collection
66+
events = np.expand_dims(np.array([0, 1, 1]), axis=0)
67+
samples, _ = self.client.pull_chunk(max_samples=n_samples,
68+
timeout=wait_time)
69+
data = np.vstack(samples).T
70+
71+
picks = _picks_to_idx(self.info, picks, 'all', exclude=())
72+
info = pick_info(self.info, picks)
73+
return EpochsArray(data[picks][np.newaxis], info, events)
74+
75+
def iter_raw_buffers(self):
76+
"""Return an iterator over raw buffers."""
77+
pylsl = _check_pylsl_installed(strict=True)
78+
inlet = pylsl.StreamInlet(self.client)
79+
80+
while True:
81+
samples, _ = inlet.pull_chunk(max_samples=self.buffer_size)
82+
83+
yield np.vstack(samples).T
84+
85+
def _connect(self):
86+
pylsl = _check_pylsl_installed(strict=True)
87+
stream_info = pylsl.resolve_byprop('source_id', self.host,
88+
timeout=self.wait_max)[0]
89+
self.client = pylsl.StreamInlet(info=stream_info,
90+
max_buflen=self.buffer_size)
91+
92+
return self
93+
94+
def _create_info(self):
95+
montage = None
96+
sfreq = self.client.info().nominal_srate()
97+
98+
lsl_info = self.client.info()
99+
ch_info = lsl_info.desc().child("channels").child("channel")
100+
ch_names = list()
101+
ch_types = list()
102+
ch_type = lsl_info.type().lower()
103+
for k in range(1, lsl_info.channel_count() + 1):
104+
ch_names.append(ch_info.child_value("label") or
105+
'{} {:03d}'.format(ch_type.upper(), k))
106+
ch_types.append(ch_info.child_value("type") or ch_type)
107+
ch_info = ch_info.next_sibling()
108+
if ch_type == "eeg":
109+
try:
110+
montage = 'standard_1005'
111+
info = create_info(ch_names, sfreq, ch_types, montage=montage)
112+
except ValueError:
113+
info = create_info(ch_names, sfreq, ch_types)
114+
115+
return info
116+
117+
def _disconnect(self):
118+
self.client.close_stream()
119+
120+
return self

0 commit comments

Comments
 (0)