17
17
You should have received a copy of the GNU Lesser General Public License
18
18
along with this program. If not, see <http://www.gnu.org/licenses/>.
19
19
'''
20
-
21
- import re
22
20
import sys
23
21
import cv2
24
22
import glob
27
25
from tkinter import filedialog
28
26
from scipy .stats import zscore
29
27
from scipy .fft import fft , ifft
30
- from cni_toolbox .auxiliary import two_gamma , gaussian
28
+ from cni_toolbox .gadgets import two_gamma
31
29
32
30
class pRF :
33
31
'''
@@ -99,16 +97,36 @@ def __init__(self, parameters, hrf = None):
99
97
self .l_hrf ) - 1 ,
100
98
self .p_sampling )
101
99
self .hrf_fft = fft (two_gamma (timepoints ), axis = 0 )
102
-
103
100
101
+ @staticmethod
102
+ def __gaussian__ (mu_x , mu_y , sigma , x , y ):
103
+ '''
104
+ Parameters
105
+ ----------
106
+ mu_x : float
107
+ center of Gaussian along x direction
108
+ mu_y : float
109
+ center of Gaussian along x direction
110
+ sigma : float
111
+ size of Gaussian
112
+ x : floating point array (1D)
113
+ x-coordinates
114
+ y : floating point array (1)
115
+ y-coordinates
116
+
117
+ Returns
118
+ -------
119
+ floating point array
120
+ '''
121
+
122
+ return np .exp ( - ((x - mu_x )** 2 + (y - mu_y )** 2 ) / (2 * sigma ** 2 ) )
104
123
105
124
def get_hrf (self ):
106
125
'''
107
126
Returns
108
127
-------
109
128
hrf : floating point array
110
129
hemodynamic response function(s) used by the class
111
-
112
130
'''
113
131
if self .hrf_fft .ndim > 1 :
114
132
hrf = ifft (np .zqueeze (
@@ -178,14 +196,12 @@ def set_stimulus(self, stimulus):
178
196
179
197
def import_stimulus (self ):
180
198
181
- stimulus_directory = '' .join (tk .filedialog .askdirectory (
182
- title = 'Please select the stimulus directory' ))
183
199
root = tk .Tk ()
200
+ stimulus_directory = '' .join (filedialog .askdirectory (
201
+ title = 'Please select the stimulus directory' ))
184
202
root .destroy ()
185
203
186
204
files = glob .glob ('%s/*.png' % stimulus_directory )
187
- l = re .search (r"\d" , files [0 ]).start ()
188
- prefix = files [0 ][0 :l ]
189
205
190
206
self .stimulus = np .zeros ((self .h_stimulus ,
191
207
self .w_stimulus ,
@@ -195,13 +211,206 @@ def import_stimulus(self):
195
211
number = int ('' .join ([str (s ) for s in f if s .isdigit ()]))
196
212
img = cv2 .imread (f )
197
213
self .stimulus [:, :, number ] = img [:, :, 0 ]
198
-
199
- i = int ( idx / self .n_samples * 21 )
200
- sys . stdout . write ( ' \r ' )
201
- sys . stdout . write ( "loading stimulus [%-20s] %d%%"
202
- % ( '=' * i , 5 * i ))
203
-
214
+
215
+ mn = np . min ( self .stimulus )
216
+ mx = np . max ( self . stimulus )
217
+ self . stimulus = ( self . stimulus - mn ) / ( mx - mn )
218
+
219
+
204
220
self .stimulus = np .reshape (self .stimulus ,
205
221
(self .h_stimulus * self .w_stimulus ,
206
222
self .n_samples + self .l_hrf ))
223
+
224
+ def create_timecourses (self , max_radius = 10.0 , n_xy = 30 ,
225
+ min_slope = 0.1 , max_slope = 1.2 ,
226
+ n_slope = 10 ):
227
+ '''
228
+ creates predicted timecourses based on the effective stimulus
229
+ and a range of isotropic receptive fields.
230
+ Isotropic receptive fields are generated for a grid of
231
+ location (x,y) and size parameters.
232
+
233
+ optional inputs are
234
+ - max_radius: float
235
+ radius of the field of fiew (default = 10.0)
236
+ - n_xy: integer
237
+ steps in x and y direction (default = 30)
238
+ - min_slope: float
239
+ lower bound of RF size slope (default = 0.1)
240
+ - max_slope: float
241
+ upper bound of RF size slope (default = 1.2)
242
+ - n_slope: integer
243
+ steps from lower to upper bound (default = 10)
244
+ '''
245
+
246
+ self .n_points = (n_xy ** 2 ) * n_slope
247
+
248
+ h_ones = np .ones (self .h_stimulus )
249
+ w_ones = np .ones (self .w_stimulus )
250
+ h_values = np .linspace (- max_radius , max_radius , self .h_stimulus )
251
+ w_values = np .linspace (- max_radius , max_radius , self .w_stimulus )
252
+ x_coordinates = np .outer (h_ones , w_values )
253
+ y_coordinates = np .outer (h_values , w_ones )
254
+
255
+ x_coordinates = np .reshape (x_coordinates , self .w_stimulus * self .h_stimulus )
256
+ y_coordinates = np .reshape (y_coordinates , self .w_stimulus * self .h_stimulus )
257
+
258
+ idx_all = np .arange (self .n_points )
259
+ self .idx = np .array ([idx_all // (n_slope * n_xy ),
260
+ (idx_all // n_slope ) % n_xy ,
261
+ idx_all % n_slope ]).transpose ()
262
+
263
+ n_lower = int (np .ceil (n_xy / 2 ))
264
+ n_upper = int (np .floor (n_xy / 2 ))
265
+ self .ecc = np .exp (np .hstack (
266
+ [np .linspace (np .log (max_radius ), np .log (.1 ), n_lower ),
267
+ np .linspace (np .log (.1 ), np .log (max_radius ), n_upper )]))
268
+ self .pa = np .linspace (0 , (n_xy - 1 ) / n_xy * 2 * np .pi , n_xy )
269
+ self .slope = np .linspace (min_slope , max_slope , n_slope )
270
+
271
+ W = np .zeros ((self .n_points ,
272
+ self .w_stimulus * self .h_stimulus ))
273
+ for p in range (self .n_points ):
274
+ x = np .cos (self .pa [self .idx [p , 0 ]]) * self .ecc [self .idx [p , 1 ]]
275
+ y = np .sin (self .pa [self .idx [p , 0 ]]) * self .ecc [self .idx [p , 1 ]]
276
+ sigma = self .ecc [self .idx [p , 1 ]] * self .slope [self .idx [p , 2 ]]
277
+ W [p , :] = self .__gaussian__ (x , y , sigma , x_coordinates ,y_coordinates )
278
+
279
+ i = int (p / self .n_points * 19 )
280
+ sys .stdout .write ('\r ' )
281
+ sys .stdout .write ("creating timecourses [%-20s] %d%%"
282
+ % ('=' * i , 5 * i ))
283
+
284
+ tc = np .matmul (W , self .stimulus ).transpose ()
285
+ sys .stdout .write ('\r ' )
286
+ sys .stdout .write ("creating timecourses [%-20s] %d%%\n " % ('=' * 20 , 100 ))
287
+ self .tc_fft = fft (tc , axis = 0 )
288
+
289
+ def mapping (self , data , threshold = 100 , mask = None ):
290
+ '''
291
+ identifies the best fitting timecourse for each voxel and
292
+ returns a dictionary with the following keys
293
+ - corr_fit
294
+ - mu_x
295
+ - mu_y
296
+ - sigma
297
+ - eccentricity
298
+ - polar_angle
299
+
300
+ The dimension of each field corresponds to the dimensions
301
+ of the data.
302
+
303
+ Required inputs are
304
+ - data : floating point array
305
+ empirically observed BOLD timecourses
306
+ whose rows correspond to time (volumes).
307
+
308
+
309
+ Optional inputs are
310
+ - threshold: float
311
+ minimum voxel intensity (default = 100.0)
312
+ - mask: boolean array
313
+ binary mask for selecting voxels (default = None)
314
+ '''
315
+ data = np .reshape (data .astype (float ),
316
+ (self .n_samples ,
317
+ self .n_total ))
318
+
319
+ mean_signal = np .mean (data , axis = 0 )
320
+ data = zscore (data , axis = 0 )
321
+
322
+ if mask == None :
323
+ mask = mean_signal >= threshold
324
+
325
+ mask = np .reshape (mask ,self .n_total )
326
+ voxel_index = np .where (mask )[0 ]
327
+ n_voxels = voxel_index .size
328
+
329
+ mag_d = np .sqrt (np .sum (data [:, mask ]** 2 , axis = 0 ))
330
+
331
+
332
+ results = {'corr_fit' : np .zeros (self .n_total ),
333
+ 'mu_x' : np .zeros (self .n_total ),
334
+ 'mu_y' : np .zeros (self .n_total ),
335
+ 'sigma' : np .zeros (self .n_total )}
336
+
337
+ if self .hrf_fft .ndim == 1 :
338
+ tc = np .transpose (
339
+ zscore (
340
+ np .abs (
341
+ ifft (self .tc_fft *
342
+ np .expand_dims (self .hrf_fft ,
343
+ axis = 1 ), axis = 0 )), axis = 0 ))
344
+ tc = tc [:, 0 :self .n_samples ]
345
+ mag_tc = np .sqrt (np .sum (tc ** 2 , axis = 1 ))
346
+ for m in range (n_voxels ):
347
+ v = voxel_index [m ]
348
+
349
+ CS = np .matmul (tc , data [:, v ]) / (mag_tc * mag_d [m ])
350
+ idx_remove = (CS == np .Inf )| (CS == np .NaN );
351
+ CS [idx_remove ] = 0
352
+
353
+ results ['corr_fit' ][v ] = np .max (CS )
354
+ idx_best = np .argmax (CS )
355
+
356
+ results ['mu_x' ][v ] = np .cos (self .pa [self .idx [idx_best , 0 ]]) * \
357
+ self .ecc [self .idx [idx_best , 1 ]]
358
+ results ['mu_y' ][v ] = np .sin (self .pa [self .idx [idx_best , 0 ]]) * \
359
+ self .ecc [self .idx [idx_best , 1 ]]
360
+ results ['sigma' ][v ] = self .ecc [self .idx [idx_best , 1 ]] * \
361
+ self .slope [self .idx [idx_best , 2 ]]
362
+
363
+ i = int (m / n_voxels * 21 )
364
+ sys .stdout .write ('\r ' )
365
+ sys .stdout .write ("pRF mapping [%-20s] %d%%"
366
+ % ('=' * i , 5 * i ))
367
+
368
+ else :
369
+ for m in range (n_voxels ):
370
+ v = voxel_index [m ]
371
+
372
+ tc = np .transpose (
373
+ zscore (
374
+ np .abs (
375
+ ifft (self .tc_fft *
376
+ np .expand_dims (self .hrf_fft [:, v ],
377
+ axis = 1 ), axis = 0 )), axis = 0 ))
378
+
379
+ tc = tc [:, 0 :self .n_samples ]
380
+ mag_tc = np .sqrt (np .sum (tc ** 2 , axis = 1 ))
381
+
382
+ CS = np .matmul (tc , data [:, v ]) / (mag_tc * mag_d [m ])
383
+ idx_remove = (CS == np .Inf ) | (CS == np .NaN )
384
+ CS [idx_remove ] = 0
207
385
386
+ results ['corr_fit' ][v ] = np .max (CS )
387
+ idx_best = np .argmax (CS )
388
+
389
+ results ['mu_x' ][v ] = np .cos (self .pa [self .idx [idx_best , 0 ]]) * \
390
+ self .ecc [self .idx [idx_best , 1 ]]
391
+ results ['mu_y' ][v ] = np .sin (self .pa [self .idx [idx_best , 0 ]]) * \
392
+ self .ecc [self .idx [idx_best , 1 ]]
393
+ results ['sigma' ][v ] = self .ecc [self .idx [idx_best , 1 ]] * \
394
+ self .slope [self .idx [idx_best , 2 ]]
395
+
396
+ i = int (m / n_voxels * 21 )
397
+ sys .stdout .write ('\r ' )
398
+ sys .stdout .write ("pRF mapping [%-20s] %d%%"
399
+ % ('=' * i , 5 * i ))
400
+
401
+
402
+ for key in results :
403
+ results [key ] = np .squeeze (
404
+ np .reshape (results [key ],
405
+ (self .n_rows ,
406
+ self .n_cols ,
407
+ self .n_slices )))
408
+
409
+ results ['eccentricity' ] = np .abs (results ['mu_x' ] +
410
+ results ['mu_y' ] * 1j )
411
+ results ['polar_angle' ] = np .angle (results ['mu_x' ] +
412
+ results ['mu_y' ] * 1j )
413
+
414
+ return results
415
+
416
+
0 commit comments