9
9
10
10
class EOGPeakDetector :
11
11
def __init__ (self , blink_button , keystroke_action , connect_button ):
12
- self .inlet = None
13
- self .sampling_rate = None
14
- self .buffer_size = None
15
- self .eog_data = None
16
- self .current_index = 0
17
- self .b , self .a = None , None
18
- self .blink_button = blink_button
19
- self .keystroke_action = keystroke_action
20
- self .connect_button = connect_button
21
- self .blink_detected = False
22
- self .running = False
23
- self .connected = False
24
- self .last_blink_time = 0
25
- self .refractory_period = 0.2
26
- self .stop_threads = False
12
+ self .inlet = None # LSL inlet for receiving data
13
+ self .sampling_rate = None # Sampling rate of the data stream
14
+ self .buffer_size = None # Size of the buffer for storing EOG data
15
+ self .eog_data = None # Buffer for EOG data
16
+ self .current_index = 0 # Current index in the buffer
17
+ self .b , self .a = None , None # Filter coefficients for low-pass filter
18
+ self .blink_button = blink_button # Button to trigger blink action
19
+ self .keystroke_action = keystroke_action # Action to perform on blink detection
20
+ self .connect_button = connect_button # Button to connect to LSL stream
21
+ self .blink_detected = False # Flag to indicate if a blink has been detected
22
+ self .running = False # Flag to control the detection loop
23
+ self .connected = False # Flag to indicate if connected to LSL stream
24
+ self .last_blink_time = 0 # Last time a blink was detected
25
+ self .refractory_period = 0.2 # Refractory period to prevent multiple eye blink detections
26
+ self .stop_threads = False # Flag to stop threads
27
27
28
28
def initialize_stream (self ):
29
+ """Initialize the LSL stream connection and set up the buffer and filter coefficients."""
29
30
print ("Searching for available LSL streams..." )
30
31
available_streams = pylsl .resolve_streams ()
31
32
@@ -42,10 +43,10 @@ def initialize_stream(self):
42
43
print (f"Sampling rate: { self .sampling_rate } Hz" )
43
44
44
45
# Set buffer size and filter coefficients
45
- self .buffer_size = self .sampling_rate * 1
46
- self .eog_data = np .zeros (self .buffer_size )
47
- self .b , self .a = butter (4 , 10.0 / (0.5 * self .sampling_rate ), btype = 'low' )
48
- self .connected = True
46
+ self .buffer_size = self .sampling_rate * 1 # Buffer size for 1 second of data
47
+ self .eog_data = np .zeros (self .buffer_size ) # Initialize buffer for EOG data
48
+ self .b , self .a = butter (4 , 10.0 / (0.5 * self .sampling_rate ), btype = 'low' ) # Low-pass filter coefficients
49
+ self .connected = True # Set connected flag to True(LSL Stream connected)
49
50
print ("LSL stream connected successfully." )
50
51
return True # Stop trying after first successful connection
51
52
@@ -57,27 +58,31 @@ def initialize_stream(self):
57
58
return False
58
59
59
60
def start_detection (self ):
61
+ """Start the peak detection process"""
60
62
print ("Starting peak detection..." )
61
- self .running = True
63
+ self .running = True # Flag to control the detection loop
62
64
while self .running :
63
65
try :
64
66
samples , _ = self .inlet .pull_chunk (timeout = 1.0 , max_samples = 1 )
65
67
if samples :
66
68
for sample in samples :
67
- self .eog_data [self .current_index ] = sample [0 ]
68
- self .current_index = (self .current_index + 1 ) % self .buffer_size
69
+ self .eog_data [self .current_index ] = sample [0 ] # Store sample in circular buffer at current position
70
+ self .current_index = (self .current_index + 1 ) % self .buffer_size # Update index with wrap-around using modulo
69
71
70
72
filtered_eog = lfilter (self .b , self .a , self .eog_data )
71
- self .detect_blinks (filtered_eog )
73
+ self .detect_blinks (filtered_eog ) # Run blink detection on the filtered signal
72
74
except Exception as e :
73
75
print (f"Error in detection: { e } " )
74
76
break
75
77
76
78
def stop_detection (self ):
79
+ """Stop the peak detection process"""
77
80
print ("Stopping peak detection..." )
78
- self .running = False
81
+ self .running = False # Set running flag to False to stop the detection loop
79
82
80
83
def detect_blinks (self , filtered_eog ):
84
+ """Detect blinks in the filtered EOG signal using a threshold-based method."""
85
+ # Calculate dynamic threshold based on signal statistics
81
86
mean_signal = np .mean (filtered_eog )
82
87
stdev_signal = np .std (filtered_eog )
83
88
threshold = mean_signal + (1.7 * stdev_signal )
@@ -87,8 +92,8 @@ def detect_blinks(self, filtered_eog):
87
92
if start_index < 0 :
88
93
start_index = 0
89
94
90
- filtered_window = filtered_eog [start_index :self .current_index ]
91
- peaks = self .detect_peaks (filtered_window , threshold )
95
+ filtered_window = filtered_eog [start_index :self .current_index ] # Get the current window of filtered EOG data
96
+ peaks = self .detect_peaks (filtered_window , threshold ) # Detect peaks above threshold in the current window
92
97
93
98
current_time = time .time ()
94
99
if peaks and (current_time - self .last_blink_time > self .refractory_period ):
@@ -105,6 +110,7 @@ def detect_peaks(self, signal, threshold):
105
110
return peaks
106
111
107
112
def trigger_action (self ):
113
+ """Trigger the keystroke action when a blink is detected."""
108
114
if not self .blink_detected :
109
115
self .blink_detected = True
110
116
print ("Triggering action..." )
@@ -121,17 +127,20 @@ def update_button_color(self):
121
127
self .blink_button .after (100 , lambda : self .blink_button .config (bg = "SystemButtonFace" ))
122
128
123
129
def quit_action (detector ):
130
+ """Handle the quit action for the GUI."""
124
131
print ("Quit button pressed. Exiting program." )
125
132
detector .stop_threads = True
126
133
detector .stop_detection ()
127
134
popup .quit ()
128
135
popup .destroy ()
129
136
130
137
def keystroke_action ():
138
+ """Perform the keystroke action (press spacebar)."""
131
139
print ("Spacebar pressed!" )
132
140
pyautogui .press ('space' )
133
141
134
142
def connect_start_stop_action (detector , connect_button ):
143
+ """Handle the connect/start/stop action for the GUI."""
135
144
if not detector .connected :
136
145
print ("Connect button pressed. Starting connection in a new thread." )
137
146
threading .Thread (target = connect_to_stream , args = (detector , connect_button ), daemon = True ).start ()
@@ -152,6 +161,7 @@ def connect_to_stream(detector, connect_button):
152
161
print ("Failed to connect to LSL stream." )
153
162
154
163
def create_popup ():
164
+ """Create the popup window for the EOG keystroke emulator."""
155
165
global popup
156
166
popup = tk .Tk ()
157
167
popup .geometry ("300x120" )
@@ -196,8 +206,8 @@ def move(event):
196
206
197
207
detector = EOGPeakDetector (blink_button , keystroke_action , connect_button )
198
208
199
- connect_button .config (command = lambda : connect_start_stop_action (detector , connect_button ))
200
- quit_button .config (command = lambda : quit_action (detector ))
209
+ connect_button .config (command = lambda : connect_start_stop_action (detector , connect_button )) # Connect/Start/Stop action
210
+ quit_button .config (command = lambda : quit_action (detector )) # Quit action
201
211
202
212
popup .mainloop ()
203
213
0 commit comments