1
+ import asyncio
2
+ from bleak import BleakScanner , BleakClient
3
+ import time
4
+ import sys
5
+ import argparse
6
+ import threading
7
+
8
+ class Chords_BLE :
9
+ # Class constants
10
+ DEVICE_NAME_PREFIX = "NPG"
11
+ SERVICE_UUID = "4fafc201-1fb5-459e-8fcc-c5c9c331914b"
12
+ DATA_CHAR_UUID = "beb5483e-36e1-4688-b7f5-ea07361b26a8"
13
+ CONTROL_CHAR_UUID = "0000ff01-0000-1000-8000-00805f9b34fb"
14
+
15
+ # Packet parameters
16
+ SINGLE_SAMPLE_LEN = 7 # (1 Counter + 3 Channels * 2 bytes)
17
+ BLOCK_COUNT = 10
18
+ NEW_PACKET_LEN = SINGLE_SAMPLE_LEN * BLOCK_COUNT
19
+
20
+ def __init__ (self ):
21
+ self .prev_unrolled_counter = None
22
+ self .samples_received = 0
23
+ self .start_time = None
24
+ self .total_missing_samples = 0
25
+ self .last_received_time = None
26
+ self .DATA_TIMEOUT = 2.0
27
+ self .client = None
28
+ self .monitor_task = None
29
+ self .print_rate_task = None
30
+ self .running = False
31
+ self .loop = None
32
+ self .connection_event = threading .Event ()
33
+ self .stop_event = threading .Event ()
34
+
35
+ @classmethod
36
+ async def scan_devices (cls ):
37
+ print ("Scanning for BLE devices..." )
38
+ devices = await BleakScanner .discover ()
39
+ filtered = [d for d in devices if d .name and d .name .startswith (cls .DEVICE_NAME_PREFIX )]
40
+
41
+ if not filtered :
42
+ print ("No NPG devices found." )
43
+ return []
44
+
45
+ return filtered
46
+
47
+ def process_sample (self , sample_data : bytearray ):
48
+ """Process a single EEG sample packet"""
49
+ self .last_received_time = time .time ()
50
+
51
+ if len (sample_data ) != self .SINGLE_SAMPLE_LEN :
52
+ print ("Unexpected sample length:" , len (sample_data ))
53
+ return
54
+
55
+ sample_counter = sample_data [0 ]
56
+ if self .prev_unrolled_counter is None :
57
+ self .prev_unrolled_counter = sample_counter
58
+ else :
59
+ last = self .prev_unrolled_counter % 256
60
+ if sample_counter < last :
61
+ current_unrolled = self .prev_unrolled_counter - last + sample_counter + 256
62
+ else :
63
+ current_unrolled = self .prev_unrolled_counter - last + sample_counter
64
+
65
+ if current_unrolled != self .prev_unrolled_counter + 1 :
66
+ missing = current_unrolled - (self .prev_unrolled_counter + 1 )
67
+ print (f"Missing { missing } sample(s)" )
68
+ self .total_missing_samples += missing
69
+
70
+ self .prev_unrolled_counter = current_unrolled
71
+
72
+ if self .start_time is None :
73
+ self .start_time = time .time ()
74
+
75
+ channels = [
76
+ int .from_bytes (sample_data [1 :3 ], byteorder = 'big' , signed = True ),
77
+ int .from_bytes (sample_data [3 :5 ], byteorder = 'big' , signed = True ),
78
+ int .from_bytes (sample_data [5 :7 ], byteorder = 'big' , signed = True )]
79
+
80
+ self .samples_received += 1
81
+
82
+ def notification_handler (self , sender , data : bytearray ):
83
+ """Handle incoming notifications from the BLE device"""
84
+ try :
85
+ if len (data ) == self .NEW_PACKET_LEN :
86
+ for i in range (0 , self .NEW_PACKET_LEN , self .SINGLE_SAMPLE_LEN ):
87
+ self .process_sample (data [i :i + self .SINGLE_SAMPLE_LEN ])
88
+ elif len (data ) == self .SINGLE_SAMPLE_LEN :
89
+ self .process_sample (data )
90
+ else :
91
+ print (f"Unexpected packet length: { len (data )} bytes" )
92
+ except Exception as e :
93
+ print (f"Error processing data: { e } " )
94
+
95
+ async def print_rate (self ):
96
+ while not self .stop_event .is_set ():
97
+ await asyncio .sleep (1 )
98
+ self .samples_received = 0
99
+
100
+ async def monitor_connection (self ):
101
+ """Monitor the connection status and check for data interruptions"""
102
+ while not self .stop_event .is_set ():
103
+ if self .last_received_time and (time .time () - self .last_received_time ) > self .DATA_TIMEOUT :
104
+ print ("\n Data Interrupted" )
105
+ print ("Cleanup Completed." )
106
+ self .running = False
107
+ break
108
+ if self .client and not self .client .is_connected :
109
+ print ("\n Data Interrupted (Bluetooth disconnected)" )
110
+ print ("Cleanup Completed." )
111
+ self .running = False
112
+ break
113
+ await asyncio .sleep (0.5 )
114
+
115
+ async def async_connect (self , device_address ):
116
+ try :
117
+ print (f"Attempting to connect to { device_address } ..." )
118
+
119
+ self .client = BleakClient (device_address )
120
+ await self .client .connect ()
121
+
122
+ if not self .client .is_connected :
123
+ print ("Failed to connect" )
124
+ return False
125
+
126
+ print (f"Connected to { device_address } " , flush = True )
127
+ self .connection_event .set ()
128
+
129
+ self .last_received_time = time .time ()
130
+ self .monitor_task = asyncio .create_task (self .monitor_connection ())
131
+ self .print_rate_task = asyncio .create_task (self .print_rate ())
132
+
133
+ await self .client .write_gatt_char (self .CONTROL_CHAR_UUID , b"START" , response = True )
134
+ print ("Sent START command" )
135
+
136
+ await self .client .start_notify (self .DATA_CHAR_UUID , self .notification_handler )
137
+ print ("Subscribed to data notifications" )
138
+
139
+ self .running = True
140
+ while self .running and not self .stop_event .is_set ():
141
+ await asyncio .sleep (1 )
142
+
143
+ return True
144
+
145
+ except Exception as e :
146
+ print (f"Connection error: { str (e )} " )
147
+ return False
148
+ finally :
149
+ await self .cleanup ()
150
+
151
+ async def cleanup (self ):
152
+ if self .monitor_task :
153
+ self .monitor_task .cancel ()
154
+ if self .print_rate_task :
155
+ self .print_rate_task .cancel ()
156
+ if self .client and self .client .is_connected :
157
+ await self .client .disconnect ()
158
+ self .running = False
159
+ self .connection_event .clear ()
160
+
161
+ def connect (self , device_address ):
162
+ self .loop = asyncio .new_event_loop ()
163
+ asyncio .set_event_loop (self .loop )
164
+
165
+ try :
166
+ self .loop .run_until_complete (self .async_connect (device_address ))
167
+ except Exception as e :
168
+ print (f"Error in connection: { str (e )} " )
169
+ return False
170
+ finally :
171
+ if self .loop .is_running ():
172
+ self .loop .close ()
173
+
174
+ def stop (self ):
175
+ self .stop_event .set ()
176
+ self .running = False
177
+ if self .loop and self .loop .is_running ():
178
+ self .loop .call_soon_threadsafe (self .loop .stop )
179
+
180
+ def parse_args ():
181
+ parser = argparse .ArgumentParser ()
182
+ parser .add_argument ("--scan" , action = "store_true" , help = "Scan for devices" )
183
+ parser .add_argument ("--connect" , type = str , help = "Connect to device address" )
184
+ return parser .parse_args ()
185
+
186
+ if __name__ == "__main__" :
187
+ args = parse_args ()
188
+ client = Chords_BLE ()
189
+
190
+ try :
191
+ if args .scan :
192
+ devices = asyncio .run (Chords_BLE .scan_devices ())
193
+ for dev in devices :
194
+ print (f"DEVICE:{ dev .name } |{ dev .address } " )
195
+ elif args .connect :
196
+ client .connect (args .connect )
197
+ try :
198
+ while client .running :
199
+ time .sleep (1 )
200
+ except KeyboardInterrupt :
201
+ client .stop ()
202
+ else :
203
+ print ("Please specify --scan or --connect" )
204
+ sys .exit (1 )
205
+ except Exception as e :
206
+ print (f"Error: { str (e )} " )
207
+ sys .exit (1 )
0 commit comments