Skip to content

Commit 148c97d

Browse files
committed
NPG-BLE modified(Creating class)
1 parent d676b48 commit 148c97d

File tree

1 file changed

+178
-142
lines changed

1 file changed

+178
-142
lines changed

npg-ble.py

Lines changed: 178 additions & 142 deletions
Original file line numberDiff line numberDiff line change
@@ -5,179 +5,215 @@
55
import sys
66
import argparse
77
import os
8+
import signal
9+
import threading
10+
from typing import Optional
811

912
# BLE parameters (must match your firmware)
1013
DEVICE_NAME_PREFIX = "NPG"
1114
SERVICE_UUID = "4fafc201-1fb5-459e-8fcc-c5c9c331914b"
1215
DATA_CHAR_UUID = "beb5483e-36e1-4688-b7f5-ea07361b26a8"
1316
CONTROL_CHAR_UUID = "0000ff01-0000-1000-8000-00805f9b34fb"
1417

15-
# Packet parameters for batched samples:
16-
SINGLE_SAMPLE_LEN = 7 # Each sample is 7 bytes
17-
BLOCK_COUNT = 10 # Batch size: 10 samples per notification
18-
NEW_PACKET_LEN = SINGLE_SAMPLE_LEN * BLOCK_COUNT # Total packet length (70 bytes)
18+
# Packet parameters
19+
SINGLE_SAMPLE_LEN = 7
20+
BLOCK_COUNT = 10
21+
NEW_PACKET_LEN = SINGLE_SAMPLE_LEN * BLOCK_COUNT
1922

20-
# Global variables
21-
prev_unrolled_counter = None
22-
samples_received = 0
23-
start_time = None
24-
total_missing_samples = 0
25-
outlet = None
26-
last_received_time = None
27-
DATA_TIMEOUT = 2.0
28-
monitor_task = None
29-
client = None
23+
class NPGBluetoothClient:
24+
def __init__(self):
25+
self.prev_unrolled_counter = None
26+
self.samples_received = 0
27+
self.start_time = None
28+
self.total_missing_samples = 0
29+
self.outlet = None
30+
self.last_received_time = None
31+
self.DATA_TIMEOUT = 2.0
32+
self.client = None
33+
self.monitor_task = None
34+
self.print_rate_task = None
35+
self.running = False
36+
self.loop = None
37+
self.connection_event = threading.Event()
38+
self.stop_event = threading.Event()
39+
40+
def process_sample(self, sample_data: bytearray):
41+
self.last_received_time = time.time()
42+
43+
if len(sample_data) != SINGLE_SAMPLE_LEN:
44+
print("Unexpected sample length:", len(sample_data))
45+
return
46+
47+
sample_counter = sample_data[0]
48+
if self.prev_unrolled_counter is None:
49+
self.prev_unrolled_counter = sample_counter
50+
else:
51+
last = self.prev_unrolled_counter % 256
52+
if sample_counter < last:
53+
current_unrolled = self.prev_unrolled_counter - last + sample_counter + 256
54+
else:
55+
current_unrolled = self.prev_unrolled_counter - last + sample_counter
56+
57+
if current_unrolled != self.prev_unrolled_counter + 1:
58+
missing = current_unrolled - (self.prev_unrolled_counter + 1)
59+
print(f"Missing {missing} sample(s)")
60+
self.total_missing_samples += missing
61+
62+
self.prev_unrolled_counter = current_unrolled
63+
64+
if self.start_time is None:
65+
self.start_time = time.time()
66+
67+
channels = [
68+
int.from_bytes(sample_data[1:3], byteorder='big', signed=True),
69+
int.from_bytes(sample_data[3:5], byteorder='big', signed=True),
70+
int.from_bytes(sample_data[5:7], byteorder='big', signed=True)]
71+
72+
if self.outlet:
73+
self.outlet.push_sample(channels)
74+
75+
self.samples_received += 1
76+
77+
if self.samples_received % 500 == 0:
78+
elapsed = time.time() - self.start_time
79+
print(f"Received {self.samples_received} samples in {elapsed:.2f}s")
80+
81+
def notification_handler(self, sender, data: bytearray):
82+
try:
83+
if len(data) == NEW_PACKET_LEN:
84+
for i in range(0, NEW_PACKET_LEN, SINGLE_SAMPLE_LEN):
85+
self.process_sample(data[i:i+SINGLE_SAMPLE_LEN])
86+
elif len(data) == SINGLE_SAMPLE_LEN:
87+
self.process_sample(data)
88+
else:
89+
print(f"Unexpected packet length: {len(data)} bytes")
90+
except Exception as e:
91+
print(f"Error processing data: {e}")
92+
93+
async def print_rate(self):
94+
while not self.stop_event.is_set():
95+
await asyncio.sleep(1)
96+
print(f"Samples per second: {self.samples_received}")
97+
self.samples_received = 0
98+
99+
async def monitor_connection(self):
100+
while not self.stop_event.is_set():
101+
if self.last_received_time and (time.time() - self.last_received_time) > self.DATA_TIMEOUT:
102+
print("\nData Interrupted")
103+
self.running = False
104+
break
105+
if self.client and not self.client.is_connected:
106+
print("\nBluetooth disconnected")
107+
self.running = False
108+
break
109+
await asyncio.sleep(0.5)
110+
111+
async def async_connect(self, device_address):
112+
try:
113+
print(f"Attempting to connect to {device_address}...")
114+
115+
# Set up LSL stream
116+
info = StreamInfo("NPG", "EXG", 3, 500, "int16", "npg1234")
117+
self.outlet = StreamOutlet(info)
118+
119+
self.client = BleakClient(device_address)
120+
await self.client.connect()
121+
122+
if not self.client.is_connected:
123+
print("Failed to connect")
124+
return False
125+
126+
print(f"Connected to {device_address}", flush=True)
127+
self.connection_event.set()
128+
129+
self.last_received_time = time.time()
130+
self.monitor_task = asyncio.create_task(self.monitor_connection())
131+
self.print_rate_task = asyncio.create_task(self.print_rate())
132+
133+
# Send start command
134+
await self.client.write_gatt_char(CONTROL_CHAR_UUID, b"START", response=True)
135+
print("Sent START command")
136+
137+
# Subscribe to notifications
138+
await self.client.start_notify(DATA_CHAR_UUID, self.notification_handler)
139+
print("Subscribed to data notifications")
140+
141+
self.running = True
142+
while self.running and not self.stop_event.is_set():
143+
await asyncio.sleep(1)
144+
145+
return True
146+
147+
except Exception as e:
148+
print(f"Connection error: {str(e)}")
149+
return False
150+
finally:
151+
await self.cleanup()
152+
153+
async def cleanup(self):
154+
if self.monitor_task:
155+
self.monitor_task.cancel()
156+
if self.print_rate_task:
157+
self.print_rate_task.cancel()
158+
if self.client and self.client.is_connected:
159+
await self.client.disconnect()
160+
self.running = False
161+
self.connection_event.clear()
162+
163+
def connect(self, device_address):
164+
self.loop = asyncio.new_event_loop()
165+
asyncio.set_event_loop(self.loop)
166+
167+
try:
168+
self.loop.run_until_complete(self.async_connect(device_address))
169+
except Exception as e:
170+
print(f"Error in connection: {str(e)}")
171+
return False
172+
finally:
173+
if self.loop.is_running():
174+
self.loop.close()
175+
176+
def stop(self):
177+
self.stop_event.set()
178+
self.running = False
179+
if self.loop and self.loop.is_running():
180+
self.loop.call_soon_threadsafe(self.loop.stop)
30181

31182
def parse_args():
32183
parser = argparse.ArgumentParser()
33-
parser.add_argument("--scan", action="store_true", help="Scan for devices and print them")
34-
parser.add_argument("--connect", type=str, help="Connect to a specific device address")
184+
parser.add_argument("--scan", action="store_true", help="Scan for devices")
185+
parser.add_argument("--connect", type=str, help="Connect to device address")
35186
return parser.parse_args()
36187

37188
async def scan_devices():
38-
print("Scanning for BLE devices...", file=sys.stderr)
189+
print("Scanning for BLE devices...")
39190
devices = await BleakScanner.discover()
40191
filtered = [d for d in devices if d.name and d.name.startswith(DEVICE_NAME_PREFIX)]
41192

42193
if not filtered:
43-
print("No devices found.", file=sys.stderr)
194+
print("No devices found.")
44195
return
45196

46-
# Print devices in format that Flask can parse
47197
for dev in filtered:
48198
print(f"DEVICE:{dev.name}|{dev.address}")
49199

50-
def process_sample(sample_data: bytearray):
51-
global prev_unrolled_counter, samples_received, start_time, total_missing_samples, outlet, last_received_time
52-
last_received_time = time.time()
53-
54-
if len(sample_data) != SINGLE_SAMPLE_LEN:
55-
print("Unexpected sample length:", len(sample_data))
56-
return
57-
58-
sample_counter = sample_data[0]
59-
# Unroll the counter:
60-
if prev_unrolled_counter is None:
61-
prev_unrolled_counter = sample_counter
62-
else:
63-
last = prev_unrolled_counter % 256
64-
if sample_counter < last:
65-
current_unrolled = prev_unrolled_counter - last + sample_counter + 256
66-
else:
67-
current_unrolled = prev_unrolled_counter - last + sample_counter
68-
69-
if current_unrolled != prev_unrolled_counter + 1:
70-
missing = current_unrolled - (prev_unrolled_counter + 1)
71-
print(f"Missing {missing} sample(s): expected {prev_unrolled_counter + 1}, got {current_unrolled}")
72-
total_missing_samples += missing
73-
74-
prev_unrolled_counter = current_unrolled
75-
76-
# Set start_time when first sample is received
77-
if start_time is None:
78-
start_time = time.time()
79-
80-
# Process channels
81-
channels = [
82-
int.from_bytes(sample_data[1:3], byteorder='little'), # Channel 0
83-
int.from_bytes(sample_data[3:5], byteorder='little'), # Channel 1
84-
int.from_bytes(sample_data[5:7], byteorder='little')] # Channel 2
85-
86-
# Push to LSL
87-
if outlet:
88-
outlet.push_sample(channels)
89-
90-
samples_received += 1
91-
92-
# Periodic status print
93-
if samples_received % 100 == 0:
94-
elapsed = time.time() - start_time
95-
print(f"Sample {prev_unrolled_counter} at {elapsed:.2f}s - Channels: {channels} - Missing: {total_missing_samples}")
96-
97-
def notification_handler(sender, data: bytearray):
98-
try:
99-
if len(data) == NEW_PACKET_LEN:
100-
for i in range(0, NEW_PACKET_LEN, SINGLE_SAMPLE_LEN):
101-
process_sample(data[i:i+SINGLE_SAMPLE_LEN])
102-
elif len(data) == SINGLE_SAMPLE_LEN:
103-
process_sample(data)
104-
else:
105-
print(f"Unexpected packet length: {len(data)} bytes")
106-
except Exception as e:
107-
print(f"Error processing data: {e}")
108-
109-
async def monitor_connection():
110-
global last_received_time, client
111-
112-
while True:
113-
if last_received_time and (time.time() - last_received_time) > DATA_TIMEOUT:
114-
print("\nData Interrupted")
115-
os._exit(1)
116-
if client and not client.is_connected:
117-
print("\nData Interrupted (Bluetooth disconnected)")
118-
os._exit(1)
119-
120-
await asyncio.sleep(0.5)
121-
122-
async def connect_to_device(device_address):
123-
global outlet, last_received_time, monitor_task, client
124-
125-
print(f"Attempting to connect to {device_address}...", file=sys.stderr)
126-
127-
# Set up LSL stream (500Hz sampling rate)
128-
info = StreamInfo("NPG", "EXG", 3, 500, "int16", "npg1234")
129-
outlet = StreamOutlet(info)
130-
131-
client = BleakClient(device_address)
132-
try:
133-
await client.connect()
134-
135-
if not client.is_connected:
136-
print("Failed to connect", file=sys.stderr)
137-
return False
138-
139-
print(f"Connected to {device_address}")
140-
141-
last_received_time = time.time()
142-
monitor_task = asyncio.create_task(monitor_connection())
143-
144-
# Send start command
145-
await client.write_gatt_char(CONTROL_CHAR_UUID, b"START", response=True)
146-
print("Sent START command")
147-
148-
# Subscribe to notifications
149-
await client.start_notify(DATA_CHAR_UUID, notification_handler)
150-
print("Subscribed to data notifications")
151-
152-
# Keep connection alive
153-
while client.is_connected:
154-
await asyncio.sleep(1)
155-
156-
return True
157-
158-
except Exception as e:
159-
print(f"Connection error: {str(e)}", file=sys.stderr)
160-
return False
161-
finally:
162-
if monitor_task:
163-
monitor_task.cancel()
164-
if client and client.is_connected:
165-
await client.disconnect()
166-
167200
if __name__ == "__main__":
168201
args = parse_args()
202+
client = NPGBluetoothClient()
169203

170204
try:
171205
if args.scan:
172206
asyncio.run(scan_devices())
173207
elif args.connect:
174-
asyncio.run(connect_to_device(args.connect))
208+
client.connect(args.connect)
209+
try:
210+
while client.running:
211+
time.sleep(1)
212+
except KeyboardInterrupt:
213+
client.stop()
175214
else:
176-
print("Please specify --scan or --connect", file=sys.stderr)
215+
print("Please specify --scan or --connect")
177216
sys.exit(1)
178-
except KeyboardInterrupt:
179-
print("\nScript terminated by user")
180-
sys.exit(0)
181217
except Exception as e:
182-
print(f"\nError: {str(e)}", file=sys.stderr)
218+
print(f"Error: {str(e)}")
183219
sys.exit(1)

0 commit comments

Comments
 (0)