13
13
CONTROL_CHAR_UUID = "0000ff01-0000-1000-8000-00805f9b34fb"
14
14
15
15
# Packet parameters
16
- SINGLE_SAMPLE_LEN = 7
16
+ SINGLE_SAMPLE_LEN = 7 # (1 Counter + 3 Channels * 2 bytes)
17
17
BLOCK_COUNT = 10
18
18
NEW_PACKET_LEN = SINGLE_SAMPLE_LEN * BLOCK_COUNT
19
19
20
20
class NPGBluetoothClient :
21
21
def __init__ (self ):
22
- self .prev_unrolled_counter = None
23
- self .samples_received = 0
24
- self .start_time = None
25
- self .total_missing_samples = 0
26
- self .outlet = None
27
- self .last_received_time = None
28
- self .DATA_TIMEOUT = 2.0
29
- self .client = None
30
- self .monitor_task = None
31
- self .print_rate_task = None
32
- self .running = False
33
- self .loop = None
34
- self .connection_event = threading .Event ()
35
- self .stop_event = threading .Event ()
22
+ self .prev_unrolled_counter = None # Previous Counter
23
+ self .samples_received = 0 # Number of Samples received
24
+ self .start_time = None # Start time of the first sample
25
+ self .total_missing_samples = 0 # Total missing samples
26
+ self .outlet = None # LSL outlet
27
+ self .last_received_time = None # Last time a sample was received
28
+ self .DATA_TIMEOUT = 2.0 # Timeout for considering data interrupted
29
+ self .client = None # Bleak client instance
30
+ self .monitor_task = None # Task for monitoring connection
31
+ self .print_rate_task = None # Task for printing sample rate
32
+ self .running = False # Flag indicating if NPGBluetoothClient is running or not
33
+ self .loop = None # Event loop for asyncio
34
+ self .connection_event = threading .Event () # Event for connection status
35
+ self .stop_event = threading .Event () # Event for stopping all operations
36
36
37
37
def process_sample (self , sample_data : bytearray ):
38
+ """Process a single EEG sample packet"""
38
39
self .last_received_time = time .time ()
39
40
41
+ # Validate Sample Length
40
42
if len (sample_data ) != SINGLE_SAMPLE_LEN :
41
43
print ("Unexpected sample length:" , len (sample_data ))
42
44
return
43
-
45
+
46
+ # Extract and Validate Sample Counter
44
47
sample_counter = sample_data [0 ]
45
48
if self .prev_unrolled_counter is None :
46
49
self .prev_unrolled_counter = sample_counter
47
50
else :
51
+ # Calculate unrolled counter (handling 0-255)
48
52
last = self .prev_unrolled_counter % 256
49
53
if sample_counter < last :
50
54
current_unrolled = self .prev_unrolled_counter - last + sample_counter + 256
51
55
else :
52
56
current_unrolled = self .prev_unrolled_counter - last + sample_counter
53
57
58
+ # Check for missing samples
54
59
if current_unrolled != self .prev_unrolled_counter + 1 :
55
60
missing = current_unrolled - (self .prev_unrolled_counter + 1 )
56
61
print (f"Missing { missing } sample(s)" )
57
62
self .total_missing_samples += missing
58
63
59
64
self .prev_unrolled_counter = current_unrolled
60
-
65
+
66
+ # Initialize timing on first sample received
61
67
if self .start_time is None :
62
68
self .start_time = time .time ()
63
69
70
+ # Extract 3 channels of EEG data (16-bit signed integers, big-endian)
64
71
channels = [
65
72
int .from_bytes (sample_data [1 :3 ], byteorder = 'big' , signed = True ),
66
73
int .from_bytes (sample_data [3 :5 ], byteorder = 'big' , signed = True ),
67
74
int .from_bytes (sample_data [5 :7 ], byteorder = 'big' , signed = True )]
68
75
76
+ # Push sample to LSL outlet
69
77
if self .outlet :
70
78
self .outlet .push_sample (channels )
71
79
72
80
self .samples_received += 1
73
81
82
+ # Periodically print the number of samples received and the elapsed time when 500 samples are received
74
83
if self .samples_received % 500 == 0 :
75
84
elapsed = time .time () - self .start_time
76
85
print (f"Received { self .samples_received } samples in { elapsed :.2f} s" )
77
86
78
87
def notification_handler (self , sender , data : bytearray ):
88
+ """Handle incoming notifications from the BLE device"""
79
89
try :
80
90
if len (data ) == NEW_PACKET_LEN :
81
91
for i in range (0 , NEW_PACKET_LEN , SINGLE_SAMPLE_LEN ):
@@ -88,44 +98,46 @@ def notification_handler(self, sender, data: bytearray):
88
98
print (f"Error processing data: { e } " )
89
99
90
100
async def print_rate (self ):
91
- while not self .stop_event .is_set ():
101
+ """Periodically print the sample rate every second"""
102
+ while not self .stop_event .is_set (): # Continue running until stop event is triggered
92
103
await asyncio .sleep (1 )
93
104
print (f"Samples per second: { self .samples_received } " )
94
- self .samples_received = 0
105
+ self .samples_received = 0 # Reset the counter after printing
95
106
96
107
async def monitor_connection (self ):
97
- while not self .stop_event .is_set ():
98
- if self .last_received_time and (time .time () - self .last_received_time ) > self .DATA_TIMEOUT :
108
+ """Monitor the connection status and check for data interruptions"""
109
+ while not self .stop_event .is_set (): # Continue running until stop event is triggered
110
+ if self .last_received_time and (time .time () - self .last_received_time ) > self .DATA_TIMEOUT : # Check for Data Timeout
99
111
print ("\n Data Interrupted" )
100
112
self .running = False
101
113
break
102
- if self .client and not self .client .is_connected :
114
+ if self .client and not self .client .is_connected : # Check for BLE Disconnection
103
115
print ("\n Data Interrupted (Bluetooth disconnected)" )
104
- self .running = False
105
- break
106
- await asyncio .sleep (0.5 )
116
+ self .running = False # Set running flag to False
117
+ break # Exit the monitoring loop
118
+ await asyncio .sleep (0.5 ) # Short sleep to prevent busy-waiting
107
119
108
120
async def async_connect (self , device_address ):
121
+ """Asynchronous function to establish BLE connection and start data streaming"""
109
122
try :
110
123
print (f"Attempting to connect to { device_address } ..." )
111
124
112
- # Set up LSL stream
113
- info = StreamInfo ("NPG" , "EXG" , 3 , 500 , "int16" , "npg1234" )
114
- self .outlet = StreamOutlet (info )
125
+ info = StreamInfo ("NPG" , "EXG" , 3 , 500 , "int16" , "npg1234" ) # Set up LSL stream
126
+ self .outlet = StreamOutlet (info ) # Create the LSL output stream
115
127
116
- self .client = BleakClient (device_address )
117
- await self .client .connect ()
128
+ self .client = BleakClient (device_address ) # Initialize and connect BLE client using the device address
129
+ await self .client .connect () # Asynchronously connect to the BLE device
118
130
119
- if not self .client .is_connected :
131
+ if not self .client .is_connected : # Verify connection was successful
120
132
print ("Failed to connect" )
121
- return False
133
+ return False # Return False if connection failed
122
134
123
135
print (f"Connected to { device_address } " , flush = True )
124
- self .connection_event .set ()
136
+ self .connection_event .set () # Shows connection is established
125
137
126
- self .last_received_time = time .time ()
127
- self .monitor_task = asyncio .create_task (self .monitor_connection ())
128
- self .print_rate_task = asyncio .create_task (self .print_rate ())
138
+ self .last_received_time = time .time () # Record current time as last received
139
+ self .monitor_task = asyncio .create_task (self .monitor_connection ()) # Task to monitor connection status
140
+ self .print_rate_task = asyncio .create_task (self .print_rate ()) # Task to periodically print sample rate
129
141
130
142
# Send start command
131
143
await self .client .write_gatt_char (CONTROL_CHAR_UUID , b"START" , response = True )
@@ -135,6 +147,7 @@ async def async_connect(self, device_address):
135
147
await self .client .start_notify (DATA_CHAR_UUID , self .notification_handler )
136
148
print ("Subscribed to data notifications" )
137
149
150
+ # Main processing loop
138
151
self .running = True
139
152
while self .running and not self .stop_event .is_set ():
140
153
await asyncio .sleep (1 )
@@ -148,62 +161,66 @@ async def async_connect(self, device_address):
148
161
await self .cleanup ()
149
162
150
163
async def cleanup (self ):
164
+ """Clean up resources and disconnect from the BLE device"""
151
165
if self .monitor_task :
152
- self .monitor_task .cancel ()
166
+ self .monitor_task .cancel () # Cancel the background monitoring task if it exists
153
167
if self .print_rate_task :
154
- self .print_rate_task .cancel ()
168
+ self .print_rate_task .cancel () # Cancel the sample rate printing task if it exists
155
169
if self .client and self .client .is_connected :
156
- await self .client .disconnect ()
157
- self .running = False
158
- self .connection_event .clear ()
170
+ await self .client .disconnect () # Disconnect from the BLE device if currently connected
171
+ self .running = False # Set running flag to False
172
+ self .connection_event .clear () # Clear the connection event flag
159
173
160
174
def connect (self , device_address ):
161
- self .loop = asyncio .new_event_loop ()
162
- asyncio .set_event_loop (self .loop )
175
+ self .loop = asyncio .new_event_loop () # Create a new async event loop (required for async operations)
176
+ asyncio .set_event_loop (self .loop ) # Set this as the active loop for our thread
163
177
164
178
try :
165
- self .loop .run_until_complete (self .async_connect (device_address ))
179
+ self .loop .run_until_complete (self .async_connect (device_address )) # Run the async connection until it finishes
166
180
except Exception as e :
167
- print (f"Error in connection: { str (e )} " )
181
+ print (f"Error in connection: { str (e )} " ) # If connection fails, print error and return False
168
182
return False
169
183
finally :
170
- if self .loop .is_running ():
184
+ if self .loop .is_running (): # Always clean up by closing the loop when everything is done
171
185
self .loop .close ()
172
186
173
187
def stop (self ):
188
+ """Stop all operations and clean up"""
174
189
self .stop_event .set ()
175
190
self .running = False
176
191
if self .loop and self .loop .is_running ():
177
192
self .loop .call_soon_threadsafe (self .loop .stop )
178
193
179
194
def parse_args ():
195
+ """Parse command line arguments"""
180
196
parser = argparse .ArgumentParser ()
181
197
parser .add_argument ("--scan" , action = "store_true" , help = "Scan for devices" )
182
198
parser .add_argument ("--connect" , type = str , help = "Connect to device address" )
183
199
return parser .parse_args ()
184
200
185
201
async def scan_devices ():
202
+ """Scan for BLE devices with NPG prefix"""
186
203
print ("Scanning for BLE devices..." )
187
- devices = await BleakScanner .discover ()
188
- filtered = [d for d in devices if d .name and d .name .startswith (DEVICE_NAME_PREFIX )]
204
+ devices = await BleakScanner .discover () # Discover all nearby BLE devices
205
+ filtered = [d for d in devices if d .name and d .name .startswith (DEVICE_NAME_PREFIX )] # Filter devices to only those with matching name prefix
189
206
190
207
if not filtered :
191
208
print ("No devices found." )
192
209
return
193
210
194
- for dev in filtered :
211
+ for dev in filtered : # Print each matching device's name and address
195
212
print (f"DEVICE:{ dev .name } |{ dev .address } " )
196
213
197
214
if __name__ == "__main__" :
198
- args = parse_args ()
199
- client = NPGBluetoothClient ()
215
+ args = parse_args () # Handle command line arguments
216
+ client = NPGBluetoothClient () # Create Bluetooth client instance
200
217
201
218
try :
202
- if args .scan :
219
+ if args .scan : # Scan flag - discover available devices
203
220
asyncio .run (scan_devices ())
204
- elif args .connect :
221
+ elif args .connect : # Connect flag - connect to a specific device
205
222
client .connect (args .connect )
206
- try :
223
+ try : # Keep running until data interrupted or connection fails
207
224
while client .running :
208
225
time .sleep (1 )
209
226
except KeyboardInterrupt :
0 commit comments