10
10
import time
11
11
12
12
# Constants
13
- FFT_WINDOW_SIZE = 512
14
- SMOOTHING_WINDOW_SIZE = 10
15
- DISPLAY_DURATION = 4 # seconds
16
- BAND_RANGES = {'delta' : (0.5 , 4 ), 'theta' : (4 , 8 ), 'alpha' : (8 , 12 ), 'beta' : (12 , 30 ), 'gamma' : (30 , 45 )}
13
+ FFT_WINDOW_SIZE = 512 # Data points we are using for fft analysis
14
+ # On increase this value - Frequency analysis becomes more accurate but updates slower
15
+ # On decrease this value - Updates faster but frequency details become less precise
16
+
17
+ SMOOTHING_WINDOW_SIZE = 10 # How many FFT results we average to make the display smoother
18
+ # On increase this value - Graph looks smoother but reacts slower to changes
19
+ # On decrease this value - Reacts faster but graph looks more jumpy
20
+
21
+ DISPLAY_DURATION = 4 # How many seconds of EEG data to show at once (in seconds)
17
22
18
23
class DataProcessor :
19
24
def __init__ (self , num_channels , sampling_rate ):
20
25
self .num_channels = num_channels
21
26
self .sampling_rate = sampling_rate
22
27
23
- # Initialize filters
28
+ # Filters - 1. A notch filter to remove electrical interference (50Hz noise) and A bandpass filter (0.5-45Hz)
24
29
self .b_notch , self .a_notch = iirnotch (50 , 30 , self .sampling_rate )
25
30
self .b_band , self .a_band = butter (4 , [0.5 / (self .sampling_rate / 2 ), 45.0 / (self .sampling_rate / 2 )], btype = 'band' )
26
31
self .zi_notch = [lfilter_zi (self .b_notch , self .a_notch ) * 0 for _ in range (num_channels )]
27
32
self .zi_band = [lfilter_zi (self .b_band , self .a_band ) * 0 for _ in range (num_channels )]
28
33
29
- # Initialize data buffers
34
+ # Circular buffers to store the last few seconds of EEG data
30
35
self .eeg_data = [np .zeros (DISPLAY_DURATION * sampling_rate ) for _ in range (num_channels )]
31
- self .current_indices = [0 for _ in range (num_channels )]
32
- self .moving_windows = [deque (maxlen = FFT_WINDOW_SIZE ) for _ in range (num_channels )]
36
+ self .current_indices = [0 for _ in range (num_channels )] # Pointers to know where to put new data
37
+ self .moving_windows = [deque (maxlen = FFT_WINDOW_SIZE ) for _ in range (num_channels )] # 3. Moving windows for FFT calculation
33
38
34
39
def process_sample (self , sample ):
35
40
filtered_data = []
36
41
for ch in range (self .num_channels ):
37
- raw_point = sample [ch ]
42
+ raw_point = sample [ch ] # Get the raw EEG value
38
43
39
44
# Apply filters
40
45
notch_filtered , self .zi_notch [ch ] = lfilter (self .b_notch , self .a_notch , [raw_point ], zi = self .zi_notch [ch ])
41
46
band_filtered , self .zi_band [ch ] = lfilter (self .b_band , self .a_band , notch_filtered , zi = self .zi_band [ch ])
42
- band_filtered = band_filtered [- 1 ]
47
+ band_filtered = band_filtered [- 1 ] # Get the final filtered value
43
48
44
49
# Update EEG data buffer
45
50
self .eeg_data [ch ][self .current_indices [ch ]] = band_filtered
@@ -60,10 +65,10 @@ def __init__(self, num_channels, sampling_rate):
60
65
self .num_channels = num_channels
61
66
self .sampling_rate = sampling_rate
62
67
63
- # Frequency bins
68
+ # Calculate all the frequency bins
64
69
self .freqs = np .fft .rfftfreq (FFT_WINDOW_SIZE , d = 1.0 / self .sampling_rate )
65
70
self .freq_resolution = self .sampling_rate / FFT_WINDOW_SIZE
66
- self .fft_window = np .hanning (FFT_WINDOW_SIZE )
71
+ self .fft_window = np .hanning (FFT_WINDOW_SIZE ) # Create a window function to make the FFT more accurate
67
72
self .window_correction = np .sum (self .fft_window ) # For amplitude scaling
68
73
69
74
# Smoothing buffers
@@ -77,7 +82,7 @@ def compute_fft(self, channel, time_data):
77
82
if len (time_data ) < FFT_WINDOW_SIZE :
78
83
return None , None
79
84
80
- # Extract the most recent FFT_WINDOW_SIZE samples
85
+ # Extract the most recent EEG Data ( FFT_WINDOW_SIZE samples)
81
86
signal_chunk = np .array (time_data [- FFT_WINDOW_SIZE :], dtype = np .float64 )
82
87
windowed_signal = signal_chunk * self .fft_window
83
88
fft_result = np .fft .rfft (windowed_signal )
@@ -101,8 +106,8 @@ def compute_fft(self, channel, time_data):
101
106
102
107
def calculate_band_power (self , fft_magnitudes , freq_range ):
103
108
low , high = freq_range
104
- mask = (self .freqs [1 :] >= low ) & (self .freqs [1 :] <= high )
105
- return np .sum (fft_magnitudes [mask ] ** 2 ) # Total power in band
109
+ mask = (self .freqs [1 :] >= low ) & (self .freqs [1 :] <= high ) # Find which frequencies are in our desired range
110
+ return np .sum (fft_magnitudes [mask ] ** 2 ) # Sum up the power in this range
106
111
107
112
def compute_band_powers (self , channel , time_data ):
108
113
freqs , fft_mag = self .compute_fft (channel , time_data )
@@ -118,10 +123,6 @@ def compute_band_powers(self, channel, time_data):
118
123
119
124
total_power = delta + theta + alpha + beta + gamma
120
125
121
- # Avoid division by zero
122
- if total_power < 1e-10 :
123
- return {band : 0.0 for band in BAND_RANGES }
124
-
125
126
# Return relative powers
126
127
return {'delta' : delta / total_power ,'theta' : theta / total_power ,'alpha' : alpha / total_power ,'beta' : beta / total_power ,'gamma' : gamma / total_power }
127
128
0 commit comments