Skip to content

Commit f92d4e2

Browse files
committed
Applying Counter based lsl stream approach to ensure lsl stream sends data with correct sampling rate
1 parent a9b8b9d commit f92d4e2

File tree

1 file changed

+197
-33
lines changed

1 file changed

+197
-33
lines changed

connection.py

Lines changed: 197 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ def __init__(self):
4141
self.rate_window = deque(maxlen=10)
4242
self.last_timestamp = time.perf_counter()
4343
self.rate_update_interval = 0.5
44+
self.ble_samples_received = 0
4445

4546
async def get_ble_device(self):
4647
devices = await Chords_BLE.scan_devices()
@@ -165,6 +166,124 @@ def lsl_rate_checker(self, duration=2.0):
165166
except Exception as e:
166167
print(f"Error in LSL rate check: {str(e)}")
167168

169+
def counter_based_data_handler(self):
170+
last_counter = -1
171+
dropped_samples = 0
172+
total_samples = 0
173+
last_print_time = time.time()
174+
175+
while self.running and self.connection:
176+
try:
177+
raw_sample = self.connection.get_latest_sample()
178+
if not raw_sample:
179+
continue
180+
181+
current_counter = raw_sample[2]
182+
channel_data = raw_sample[3:]
183+
184+
# Handle counter rollover (0-255)
185+
if last_counter != -1:
186+
expected_counter = (last_counter + 1) % 256
187+
188+
if current_counter != expected_counter:
189+
if current_counter > last_counter:
190+
missed = current_counter - last_counter - 1
191+
else:
192+
missed = (256 - last_counter - 1) + current_counter
193+
194+
dropped_samples += missed
195+
print(f"\nWarning: {missed} samples dropped. Counter jump: {last_counter} -> {current_counter}")
196+
197+
# Only process if this is a new sample
198+
if current_counter != last_counter:
199+
total_samples += 1
200+
timestamp = local_clock()
201+
202+
if self.lsl_connection:
203+
self.lsl_connection.push_sample(channel_data, timestamp=timestamp)
204+
205+
if self.recording_active:
206+
self.log_to_csv(channel_data)
207+
208+
last_counter = current_counter
209+
210+
self.update_sample_rate()
211+
212+
# Print stats every 5 seconds
213+
if time.time() - last_print_time > 5:
214+
drop_rate = (dropped_samples / total_samples) * 100 if total_samples > 0 else 0
215+
print(f"\nStats - Processed: {total_samples}, Dropped: {dropped_samples} ({drop_rate:.2f}%)")
216+
last_print_time = time.time()
217+
218+
except Exception as e:
219+
print(f"\nCounter-based handler error: {str(e)}")
220+
print(f"Last counter: {last_counter}, Current counter: {current_counter}")
221+
break
222+
223+
def hybrid_data_handler(self):
224+
last_counter = -1
225+
target_interval = 1.0 / 500.0
226+
last_timestamp = local_clock()
227+
dropped_samples = 0
228+
total_samples = 0
229+
last_print_time = time.time()
230+
231+
while self.running and self.connection:
232+
try:
233+
raw_sample = self.connection.get_latest_sample()
234+
if not raw_sample:
235+
continue
236+
237+
current_counter = raw_sample[2]
238+
channel_data = raw_sample[3:]
239+
240+
if current_counter == last_counter:
241+
continue
242+
243+
current_time = local_clock()
244+
245+
counter_diff = (current_counter - last_counter) % 256
246+
if counter_diff == 0:
247+
counter_diff = 256
248+
249+
# Check for missed samples
250+
if last_counter != -1 and counter_diff > 1:
251+
dropped_samples += (counter_diff - 1)
252+
print(f"\nWarning: {counter_diff - 1} samples dropped. Counter jump: {last_counter} -> {current_counter}")
253+
print(f"Current timestamp: {current_time}")
254+
print(f"Sample data: {channel_data}")
255+
256+
time_per_sample = target_interval
257+
258+
for i in range(counter_diff):
259+
sample_timestamp = last_timestamp + (i + 1) * time_per_sample
260+
261+
# Check if we're falling behind
262+
if local_clock() > sample_timestamp + time_per_sample * 2:
263+
print(f"\nWarning: Falling behind by {local_clock() - sample_timestamp:.4f}s, skipping samples")
264+
break
265+
266+
if self.lsl_connection:
267+
self.lsl_connection.push_sample(channel_data, timestamp=sample_timestamp)
268+
269+
if self.recording_active:
270+
self.log_to_csv(channel_data)
271+
272+
total_samples += 1
273+
274+
last_counter = current_counter
275+
last_timestamp = current_time
276+
277+
if time.time() - last_print_time > 5:
278+
drop_rate = (dropped_samples / total_samples) * 100 if total_samples > 0 else 0
279+
print(f"\nStats - Processed: {total_samples}, Dropped: {dropped_samples} ({drop_rate:.2f}%)")
280+
last_print_time = time.time()
281+
282+
except Exception as e:
283+
print(f"\nHybrid handler error: {str(e)}")
284+
print(f"Last counter: {last_counter}, Current counter: {current_counter}")
285+
break
286+
168287
def ble_data_handler(self):
169288
TARGET_SAMPLE_RATE = 500.0
170289
SAMPLE_INTERVAL = 1.0 / TARGET_SAMPLE_RATE
@@ -265,11 +384,52 @@ def usb_data_handler(self):
265384
if self.recording_active:
266385
self.log_to_csv(channel_data)
267386
except Exception as e:
268-
print(f"USB data handler error: {str(e)}")
387+
print(f"\nUSB data handler error: {str(e)}")
269388
break
270389

390+
def connect_usb_with_counter(self):
391+
self.usb_connection = Chords_USB()
392+
if not self.usb_connection.detect_hardware():
393+
return False
394+
395+
self.num_channels = self.usb_connection.num_channels
396+
self.sampling_rate = self.usb_connection.supported_boards[self.usb_connection.board]["sampling_rate"]
397+
398+
self.setup_lsl(self.num_channels, self.sampling_rate)
399+
self.usb_connection.send_command('START')
400+
401+
self.running = True
402+
self.usb_thread = threading.Thread(target=self.counter_based_data_handler)
403+
self.usb_thread.daemon = True
404+
self.usb_thread.start()
405+
406+
return True
407+
271408
def connect_ble(self, device_address=None):
272409
self.ble_connection = Chords_BLE()
410+
original_notification_handler = self.ble_connection.notification_handler
411+
412+
def notification_handler(sender, data):
413+
if len(data) == self.ble_connection.NEW_PACKET_LEN:
414+
if not self.lsl_connection:
415+
self.setup_lsl(num_channels=3, sampling_rate=500)
416+
417+
original_notification_handler(sender, data)
418+
419+
for i in range(0, self.ble_connection.NEW_PACKET_LEN, self.ble_connection.SINGLE_SAMPLE_LEN):
420+
sample_data = data[i:i+self.ble_connection.SINGLE_SAMPLE_LEN]
421+
if len(sample_data) == self.ble_connection.SINGLE_SAMPLE_LEN:
422+
channels = [int.from_bytes(sample_data[i:i+2], byteorder='big', signed=True)
423+
for i in range(1, len(sample_data), 2)]
424+
self.last_sample = channels
425+
self.ble_samples_received += 1
426+
427+
if self.lsl_connection: # Push to LSL
428+
self.lsl_connection.push_sample(channels)
429+
if self.recording_active:
430+
self.log_to_csv(channels)
431+
432+
self.ble_connection.notification_handler = notification_handler
273433

274434
try:
275435
if device_address:
@@ -281,46 +441,50 @@ def connect_ble(self, device_address=None):
281441
return False
282442
print(f"Connecting to BLE device: {selected_device.name}")
283443
self.ble_connection.connect(selected_device.address)
284-
285-
self.num_channels = 3
286-
self.sampling_rate = 500
287-
self.setup_lsl(self.num_channels, self.sampling_rate)
288-
289-
self.running = True
290-
self.ble_thread = threading.Thread(target=self.ble_data_handler)
291-
self.ble_thread.daemon = True
292-
self.ble_thread.start()
293-
294-
threading.Thread(target=self.lsl_rate_checker, daemon=True).start()
295-
print("BLE connection established. Streaming data...")
444+
445+
print("BLE connection established. Waiting for data...")
296446
return True
297447
except Exception as e:
298448
print(f"BLE connection failed: {str(e)}")
299449
return False
300450

301451
def connect_wifi(self):
302452
self.wifi_connection = Chords_WIFI()
453+
self.wifi_connection.connect()
454+
455+
self.num_channels = self.wifi_connection.channels
456+
sampling_rate = self.wifi_connection.sampling_rate
457+
458+
if not self.lsl_connection:
459+
self.setup_lsl(self.num_channels, sampling_rate)
303460

304461
try:
305-
if not self.wifi_connection.connect():
306-
print("WiFi connection failed")
307-
return False
462+
print("\nConnected! (Press Ctrl+C to stop)")
463+
while True:
464+
data = self.wifi_connection.ws.recv()
308465

309-
self.num_channels = self.wifi_connection.channels
310-
self.sampling_rate = self.wifi_connection.sampling_rate
311-
self.setup_lsl(self.num_channels, self.sampling_rate)
312-
313-
self.running = True
314-
self.wifi_thread = threading.Thread(target=self.wifi_data_handler)
315-
self.wifi_thread.daemon = True
316-
self.wifi_thread.start()
317-
318-
threading.Thread(target=self.lsl_rate_checker, daemon=True).start()
319-
print("WiFi connection established. Streaming data...")
320-
return True
321-
except Exception as e:
322-
print(f"WiFi connection failed: {str(e)}")
323-
return False
466+
if isinstance(data, (bytes, list)):
467+
for i in range(0, len(data), self.wifi_connection.block_size):
468+
block = data[i:i + self.wifi_connection.block_size]
469+
if len(block) < self.wifi_connection.block_size:
470+
continue
471+
472+
channel_data = []
473+
for ch in range(self.wifi_connection.channels):
474+
offset = 1 + ch * 2
475+
sample = int.from_bytes(block[offset:offset + 2], byteorder='big', signed=True)
476+
channel_data.append(sample)
477+
478+
if self.lsl_connection: # Push to LSL
479+
self.lsl_connection.push_sample(channel_data)
480+
if self.recording_active:
481+
self.log_to_csv(channel_data)
482+
483+
except KeyboardInterrupt:
484+
self.wifi_connection.disconnect()
485+
print("\nDisconnected")
486+
finally:
487+
self.stop_csv_recording()
324488

325489
def connect_usb(self):
326490
self.usb_connection = Chords_USB()
@@ -351,7 +515,7 @@ def cleanup(self):
351515
if self.lsl_connection:
352516
self.lsl_connection = None
353517
self.stream_active = False
354-
print("LSL stream stopped")
518+
print("\nLSL stream stopped")
355519

356520
threads = []
357521
if self.usb_thread and self.usb_thread.is_alive():
@@ -424,7 +588,7 @@ def main():
424588
except KeyboardInterrupt:
425589
print("\nCleanup Completed.")
426590
except Exception as e:
427-
print(f"Error: {str(e)}")
591+
print(f"\nError: {str(e)}")
428592
finally:
429593
manager.cleanup()
430594

0 commit comments

Comments
 (0)