1
1
'''
2
2
-----------------------------------------------------------------------------
3
- LICENSE
3
+ LICENSE
4
4
5
5
Copyright 2020 Mario Senden
6
6
27
27
class IRM :
28
28
'''
29
29
Input-referred model (IRM) mapping tool.
30
-
30
+
31
31
irm = IRM(parameters) creates an instance of the IRM class.
32
32
parameters is a dictionary with 5 required keys
33
33
- f_sampling: sampling frequency (1/TR)
34
34
- n_samples : number of samples (volumes)
35
35
- n_rows : number of rows (in-plane resolution)
36
36
- n_cols : number of columns (in-plance resolution)
37
37
- n_slices : number of slices
38
-
38
+
39
39
optional inputs are
40
40
- hrf : either a column vector containing a single hemodynamic
41
41
response used for every voxel;
42
42
or a matrix with a unique hemodynamic response along
43
43
its columns for each voxel.
44
44
By default the canonical two-gamma hemodynamic response
45
45
function is generated internally based on the scan parameters.
46
-
46
+
47
47
This class has the following functions
48
-
48
+
49
49
- hrf = IRM.get_hrf()
50
50
- stimulus = IRM.get_stimulus()
51
51
- tc = IRM.get_timecourses()
52
52
- IRM.set_hrf(hrf)
53
53
- IRM.set_stimulus(stimulus)
54
54
- IRM.create_timecourses()
55
55
- results = IRM.mapping(data)
56
-
56
+
57
57
58
58
Typical workflow:
59
59
1. irm = IRM(params)
60
60
2. irm.set_stimulus()
61
61
3. irm.create_timecourse(FUN,xdata)
62
62
4. results = irm.mapping(data)
63
63
'''
64
-
64
+
65
65
def __init__ (self , parameters , hrf = None ):
66
66
self .f_sampling = parameters ['f_sampling' ]
67
67
self .p_sampling = 1 / self .f_sampling
@@ -70,7 +70,7 @@ def __init__(self, parameters, hrf = None):
70
70
self .n_cols = parameters ['n_cols' ]
71
71
self .n_slices = parameters ['n_slices' ]
72
72
self .n_total = self .n_rows * self .n_cols * self .n_slices
73
-
73
+
74
74
if hrf != None :
75
75
self .l_hrf = hrf .shape [0 ]
76
76
if hrf .ndim > 2 :
@@ -80,23 +80,23 @@ def __init__(self, parameters, hrf = None):
80
80
self .n_total )))),
81
81
axis = 0 )
82
82
else :
83
- self .hrf_fft = fft (np .append (hrf ,
83
+ self .hrf_fft = fft (np .append (hrf ,
84
84
np .zeros (self .n_samples )),
85
85
axis = 0 )
86
86
else :
87
87
self .l_hrf = int (32 * self .f_sampling )
88
- timepoints = np .arange (0 ,
88
+ timepoints = np .arange (0 ,
89
89
self .p_sampling * (self .n_samples +
90
- self .l_hrf ) - 1 ,
90
+ self .l_hrf ) - 1 ,
91
91
self .p_sampling )
92
92
self .hrf_fft = fft (two_gamma (timepoints ), axis = 0 )
93
-
94
-
93
+
94
+
95
95
def get_hrf (self ):
96
96
'''
97
97
Returns
98
98
-------
99
- hrf : floating point array
99
+ hrf: floating point array
100
100
hemodynamic response function(s) used by the class
101
101
'''
102
102
if self .hrf_fft .ndim > 1 :
@@ -109,34 +109,34 @@ def get_hrf(self):
109
109
axis = 0 )[0 :self .l_hrf , :]
110
110
else :
111
111
hrf = ifft (self .hrf_fft , axis = 0 )[0 :self .l_hrf ]
112
-
112
+
113
113
return np .abs (hrf )
114
-
114
+
115
115
def get_stimulus (self ):
116
116
'''
117
117
Returns
118
118
-------
119
119
floating point array (1D)
120
120
stimulus used by the class
121
121
'''
122
-
122
+
123
123
return self .stimulus
124
-
124
+
125
125
def get_timecourses (self ):
126
126
'''
127
127
Returns
128
128
-------
129
129
floating point array (time-by-gridsize)
130
130
predicted timecourses
131
131
'''
132
-
132
+
133
133
return np .abs (ifft (self .tc_fft , axis = 0 )[0 :self .n_samples , :])
134
-
134
+
135
135
def set_hrf (self , hrf ):
136
136
'''
137
137
Parameters
138
138
----------
139
- hrf : floating point array
139
+ hrf: floating point array
140
140
hemodynamic response function
141
141
'''
142
142
self .l_hrf = hrf .shape [0 ]
@@ -147,100 +147,100 @@ def set_hrf(self, hrf):
147
147
self .n_total )))),
148
148
axis = 0 )
149
149
else :
150
- self .hrf_fft = fft (np .append (hrf ,
150
+ self .hrf_fft = fft (np .append (hrf ,
151
151
np .zeros (self .n_samples )),
152
152
axis = 0 )
153
-
153
+
154
154
def set_stimulus (self , stimulus ):
155
155
'''
156
-
157
156
Parameters
158
157
----------
159
- stimulus : floating point array (1D)
158
+ stimulus: floating point array (1D)
160
159
stimulus used by the class
161
-
162
160
'''
163
161
self .stimulus = stimulus
164
-
162
+
165
163
def create_timecourses (self , FUN , xdata ):
166
- '''
164
+ '''
167
165
creates predicted timecourses based on the stimulus protocol
168
166
and a range of parameters for an input referred model.
169
167
170
168
Required inputs are
171
- - FUN : function handle
172
- defining the input referred model
169
+ - FUN: function handle
170
+ defining the input referred model
173
171
- xdata: dictionary with p elements (p = number of parameters).
174
- Each cell element needs to contain a column vector
175
- of variable length with parameter values to be explored.
176
- '''
177
-
172
+ Each cell element needs to contain a column vector
173
+ of variable length with parameter values to be explored.
174
+ '''
175
+ print ('\n creating timecourses' )
176
+
178
177
self .xdata = xdata
179
178
self .n_predictors = len (xdata )
179
+
180
180
n_observations = np .zeros (self .n_predictors )
181
181
for idx , key in enumerate (self .xdata ):
182
182
n_observations [idx ] = np .size (self .xdata [key ])
183
-
184
183
self .n_points = np .prod (n_observations ).astype (int )
185
-
184
+
186
185
idx_all = np .arange (self .n_points )
187
186
self .idx = np .zeros ((self .n_points , self .n_predictors ))
188
-
189
187
for p in range (self .n_predictors ):
190
188
self .idx [:, p ] = (idx_all // (np .prod (n_observations [p + 1 ::]))) \
191
189
% n_observations [p ]
192
- self .idx = self .idx .astype (int )
190
+ self .idx = self .idx .astype (int )
191
+
193
192
tc = np .zeros ((self .n_samples + self .l_hrf ,
194
193
self .n_points ))
194
+
195
195
x = np .zeros (self .n_predictors )
196
- print ( ' \n creating timecourses' )
196
+
197
197
for j in range (self .n_points ):
198
198
for p , key in enumerate (self .xdata ):
199
199
x [p ] = self .xdata [key ][self .idx [j , p ]]
200
200
tc [0 :self .n_samples , j ] = FUN (self .stimulus , x )
201
201
i = int (j / self .n_points * 21 )
202
202
sys .stdout .write ('\r ' )
203
- sys .stdout .write ("[%-20s] %d%%"
203
+ sys .stdout .write ("[%-20s] %d%%"
204
204
% ('=' * i , 5 * i ))
205
205
206
206
self .tc_fft = fft (tc , axis = 0 )
207
-
207
+
208
208
def mapping (self , data , threshold = 100 , mask = []):
209
209
'''
210
210
identifies the best fitting timecourse for each voxel and
211
- returns a dictionary with keyes corresponding to the
211
+ returns a dictionary with keys corresponding to the
212
212
parameters specified in xdata plus a key 'corr_fit'
213
213
storing the fitness of the solution.
214
214
215
215
Required inputs are
216
- - data : floating point array
216
+ - data: floating point array
217
217
empirically observed BOLD timecourses
218
218
whose rows correspond to time (volumes).
219
219
220
-
221
220
Optional inputs are
222
221
- threshold: float
223
222
minimum voxel intensity (default = 100.0)
224
- - mask: boolean array
223
+ - mask: boolean array
225
224
binary mask for selecting voxels (default = None)
226
225
'''
227
- data = np .reshape (data .astype (float ),
226
+ print ('\n mapping model parameters' )
227
+
228
+ data = np .reshape (data .astype (float ),
228
229
(self .n_samples ,
229
230
self .n_total ))
230
-
231
+
231
232
mean_signal = np .mean (data , axis = 0 )
232
233
data = zscore (data , axis = 0 )
233
-
234
+
234
235
if np .size (mask )== 0 :
235
236
mask = mean_signal >= threshold
236
237
237
238
mask = np .reshape (mask ,self .n_total )
238
239
voxel_index = np .where (mask )[0 ]
239
240
n_voxels = voxel_index .size
240
-
241
+
241
242
mag_d = np .sqrt (np .sum (data [:, mask ]** 2 , axis = 0 ))
242
-
243
-
243
+
244
244
results = {'corr_fit' : np .zeros (self .n_total )}
245
245
for key in self .xdata :
246
246
results [key ] = np .zeros (self .n_total )
@@ -254,60 +254,60 @@ def mapping(self, data, threshold = 100, mask = []):
254
254
axis = 1 ), axis = 0 )), axis = 0 ))
255
255
tc = tc [:, 0 :self .n_samples ]
256
256
mag_tc = np .sqrt (np .sum (tc ** 2 , axis = 1 ))
257
- print ( ' \n mapping model parameters' )
257
+
258
258
for m in range (n_voxels ):
259
259
v = voxel_index [m ]
260
-
260
+
261
261
CS = np .matmul (tc , data [:, v ]) / (mag_tc * mag_d [m ])
262
262
idx_remove = (CS == np .Inf )| (CS == np .NaN );
263
263
CS [idx_remove ] = 0
264
-
264
+
265
265
results ['corr_fit' ][v ] = np .max (CS )
266
266
idx_best = np .argmax (CS )
267
-
267
+
268
268
for pos , key in enumerate (self .xdata ):
269
269
results [key ][v ] = self .xdata [key ][self .idx [idx_best , pos ]]
270
270
271
271
i = int (m / n_voxels * 21 )
272
272
sys .stdout .write ('\r ' )
273
- sys .stdout .write ("[%-20s] %d%%"
273
+ sys .stdout .write ("[%-20s] %d%%"
274
274
% ('=' * i , 5 * i ))
275
-
275
+
276
276
else :
277
277
for m in range (n_voxels ):
278
278
v = voxel_index [m ]
279
-
279
+
280
280
tc = np .transpose (
281
281
zscore (
282
282
np .abs (
283
283
ifft (self .tc_fft *
284
284
np .expand_dims (self .hrf_fft [:, v ],
285
285
axis = 1 ), axis = 0 )), axis = 0 ))
286
-
286
+
287
287
tc = tc [:, 0 :self .n_samples ]
288
288
mag_tc = np .sqrt (np .sum (tc ** 2 , axis = 1 ))
289
-
289
+
290
290
CS = np .matmul (tc , data [:, v ]) / (mag_tc * mag_d [m ])
291
291
idx_remove = (CS == np .Inf )| (CS == np .NaN );
292
292
CS [idx_remove ] = 0
293
-
293
+
294
294
results ['corr_fit' ][v ] = np .max (CS )
295
295
idx_best = np .argmax (CS )
296
-
296
+
297
297
for pos , key in enumerate (self .xdata ):
298
298
results [key ][v ] = self .xdata [key ][self .idx [idx_best , pos ]]
299
299
300
300
i = int (m / n_voxels * 21 )
301
301
sys .stdout .write ('\r ' )
302
- sys .stdout .write ("[%-20s] %d%%"
302
+ sys .stdout .write ("[%-20s] %d%%"
303
303
% ('=' * i , 5 * i ))
304
-
305
-
304
+
305
+
306
306
for key in results :
307
307
results [key ] = np .squeeze (
308
308
np .reshape (results [key ],
309
309
(self .n_rows ,
310
310
self .n_cols ,
311
311
self .n_slices )))
312
-
312
+
313
313
return results
0 commit comments