Skip to content

Commit 2f48bba

Browse files
committed
Adding comments to keystroke.py
1 parent ead62f4 commit 2f48bba

File tree

1 file changed

+38
-28
lines changed

1 file changed

+38
-28
lines changed

keystroke.py

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,24 @@
99

1010
class EOGPeakDetector:
1111
def __init__(self, blink_button, keystroke_action, connect_button):
12-
self.inlet = None
13-
self.sampling_rate = None
14-
self.buffer_size = None
15-
self.eog_data = None
16-
self.current_index = 0
17-
self.b, self.a = None, None
18-
self.blink_button = blink_button
19-
self.keystroke_action = keystroke_action
20-
self.connect_button = connect_button
21-
self.blink_detected = False
22-
self.running = False
23-
self.connected = False
24-
self.last_blink_time = 0
25-
self.refractory_period = 0.2
26-
self.stop_threads = False
12+
self.inlet = None # LSL inlet for receiving data
13+
self.sampling_rate = None # Sampling rate of the data stream
14+
self.buffer_size = None # Size of the buffer for storing EOG data
15+
self.eog_data = None # Buffer for EOG data
16+
self.current_index = 0 # Current index in the buffer
17+
self.b, self.a = None, None # Filter coefficients for low-pass filter
18+
self.blink_button = blink_button # Button to trigger blink action
19+
self.keystroke_action = keystroke_action # Action to perform on blink detection
20+
self.connect_button = connect_button # Button to connect to LSL stream
21+
self.blink_detected = False # Flag to indicate if a blink has been detected
22+
self.running = False # Flag to control the detection loop
23+
self.connected = False # Flag to indicate if connected to LSL stream
24+
self.last_blink_time = 0 # Last time a blink was detected
25+
self.refractory_period = 0.2 # Refractory period to prevent multiple eye blink detections
26+
self.stop_threads = False # Flag to stop threads
2727

2828
def initialize_stream(self):
29+
"""Initialize the LSL stream connection and set up the buffer and filter coefficients."""
2930
print("Searching for available LSL streams...")
3031
available_streams = pylsl.resolve_streams()
3132

@@ -42,10 +43,10 @@ def initialize_stream(self):
4243
print(f"Sampling rate: {self.sampling_rate} Hz")
4344

4445
# Set buffer size and filter coefficients
45-
self.buffer_size = self.sampling_rate * 1
46-
self.eog_data = np.zeros(self.buffer_size)
47-
self.b, self.a = butter(4, 10.0 / (0.5 * self.sampling_rate), btype='low')
48-
self.connected = True
46+
self.buffer_size = self.sampling_rate * 1 # Buffer size for 1 second of data
47+
self.eog_data = np.zeros(self.buffer_size) # Initialize buffer for EOG data
48+
self.b, self.a = butter(4, 10.0 / (0.5 * self.sampling_rate), btype='low') # Low-pass filter coefficients
49+
self.connected = True # Set connected flag to True(LSL Stream connected)
4950
print("LSL stream connected successfully.")
5051
return True # Stop trying after first successful connection
5152

@@ -57,27 +58,31 @@ def initialize_stream(self):
5758
return False
5859

5960
def start_detection(self):
61+
"""Start the peak detection process"""
6062
print("Starting peak detection...")
61-
self.running = True
63+
self.running = True # Flag to control the detection loop
6264
while self.running:
6365
try:
6466
samples, _ = self.inlet.pull_chunk(timeout=1.0, max_samples=1)
6567
if samples:
6668
for sample in samples:
67-
self.eog_data[self.current_index] = sample[0]
68-
self.current_index = (self.current_index + 1) % self.buffer_size
69+
self.eog_data[self.current_index] = sample[0] # Store sample in circular buffer at current position
70+
self.current_index = (self.current_index + 1) % self.buffer_size # Update index with wrap-around using modulo
6971

7072
filtered_eog = lfilter(self.b, self.a, self.eog_data)
71-
self.detect_blinks(filtered_eog)
73+
self.detect_blinks(filtered_eog) # Run blink detection on the filtered signal
7274
except Exception as e:
7375
print(f"Error in detection: {e}")
7476
break
7577

7678
def stop_detection(self):
79+
"""Stop the peak detection process"""
7780
print("Stopping peak detection...")
78-
self.running = False
81+
self.running = False # Set running flag to False to stop the detection loop
7982

8083
def detect_blinks(self, filtered_eog):
84+
"""Detect blinks in the filtered EOG signal using a threshold-based method."""
85+
# Calculate dynamic threshold based on signal statistics
8186
mean_signal = np.mean(filtered_eog)
8287
stdev_signal = np.std(filtered_eog)
8388
threshold = mean_signal + (1.7 * stdev_signal)
@@ -87,8 +92,8 @@ def detect_blinks(self, filtered_eog):
8792
if start_index < 0:
8893
start_index = 0
8994

90-
filtered_window = filtered_eog[start_index:self.current_index]
91-
peaks = self.detect_peaks(filtered_window, threshold)
95+
filtered_window = filtered_eog[start_index:self.current_index] # Get the current window of filtered EOG data
96+
peaks = self.detect_peaks(filtered_window, threshold) # Detect peaks above threshold in the current window
9297

9398
current_time = time.time()
9499
if peaks and (current_time - self.last_blink_time > self.refractory_period):
@@ -105,6 +110,7 @@ def detect_peaks(self, signal, threshold):
105110
return peaks
106111

107112
def trigger_action(self):
113+
"""Trigger the keystroke action when a blink is detected."""
108114
if not self.blink_detected:
109115
self.blink_detected = True
110116
print("Triggering action...")
@@ -121,17 +127,20 @@ def update_button_color(self):
121127
self.blink_button.after(100, lambda: self.blink_button.config(bg="SystemButtonFace"))
122128

123129
def quit_action(detector):
130+
"""Handle the quit action for the GUI."""
124131
print("Quit button pressed. Exiting program.")
125132
detector.stop_threads = True
126133
detector.stop_detection()
127134
popup.quit()
128135
popup.destroy()
129136

130137
def keystroke_action():
138+
"""Perform the keystroke action (press spacebar)."""
131139
print("Spacebar pressed!")
132140
pyautogui.press('space')
133141

134142
def connect_start_stop_action(detector, connect_button):
143+
"""Handle the connect/start/stop action for the GUI."""
135144
if not detector.connected:
136145
print("Connect button pressed. Starting connection in a new thread.")
137146
threading.Thread(target=connect_to_stream, args=(detector, connect_button), daemon=True).start()
@@ -152,6 +161,7 @@ def connect_to_stream(detector, connect_button):
152161
print("Failed to connect to LSL stream.")
153162

154163
def create_popup():
164+
"""Create the popup window for the EOG keystroke emulator."""
155165
global popup
156166
popup = tk.Tk()
157167
popup.geometry("300x120")
@@ -196,8 +206,8 @@ def move(event):
196206

197207
detector = EOGPeakDetector(blink_button, keystroke_action, connect_button)
198208

199-
connect_button.config(command=lambda: connect_start_stop_action(detector, connect_button))
200-
quit_button.config(command=lambda: quit_action(detector))
209+
connect_button.config(command=lambda: connect_start_stop_action(detector, connect_button)) # Connect/Start/Stop action
210+
quit_button.config(command=lambda: quit_action(detector)) # Quit action
201211

202212
popup.mainloop()
203213

0 commit comments

Comments
 (0)