1
+ '''
2
+ -----------------------------------------------------------------------------
3
+ LICENSE
4
+
5
+ Copyright 2020 Mario Senden
6
+
7
+ This program is free software: you can redistribute it and/or modify
8
+ it under the terms of the GNU Lesser General Public License as published
9
+ by the Free Software Foundation, either version 3 of the License, or
10
+ at your option) any later version.
11
+
12
+ This program is distributed in the hope that it will be useful,
13
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
14
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
+ GNU Lesser General Public License for more details.
16
+
17
+ You should have received a copy of the GNU Lesser General Public License
18
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
19
+ '''
20
+
21
+ import sys
22
+ import numpy as np
23
+ from scipy .stats import zscore
24
+ from scipy .fft import fft , ifft
25
+ from cni_toolbox .gadgets import two_gamma
26
+
27
+ class IRM :
28
+ '''
29
+ Input-referred model (IRM) mapping tool.
30
+
31
+ irm = IRM(params) creates an instance of the IRM class.
32
+ parameters is a dictionary with 5 required keys
33
+ - f_sampling: sampling frequency (1/TR)
34
+ - n_samples : number of samples (volumes)
35
+ - n_rows : number of rows (in-plane resolution)
36
+ - n_cols : number of columns (in-plance resolution)
37
+ - n_slices : number of slices
38
+
39
+ optional inputs are
40
+ - hrf : either a column vector containing a single hemodynamic
41
+ response used for every voxel;
42
+ or a matrix with a unique hemodynamic response along
43
+ its columns for each voxel.
44
+ By default the canonical two-gamma hemodynamic response
45
+ function is generated internally based on the scan parameters.
46
+
47
+ This class has the following functions
48
+
49
+ - hrf = IRM.get_hrf()
50
+ - stimulus = IRM.get_stimulus()
51
+ - tc = IRM.get_timecourses()
52
+ - IRM.set_hrf(hrf)
53
+ - IRM.set_stimulus(stimulus)
54
+ - IRM.create_timecourses()
55
+ - results = IRM.mapping(data)
56
+
57
+
58
+ Typical workflow:
59
+ 1. irm = IRM(params)
60
+ 2. irm.set_stimulus()
61
+ 3. irm.create_timecourse(FUN,xdata)
62
+ 4. results = irm.mapping(data)
63
+ '''
64
+
65
+ def __init__ (self , parameters , hrf = None ):
66
+ self .f_sampling = parameters ['f_sampling' ]
67
+ self .p_sampling = 1 / self .f_sampling
68
+ self .n_samples = parameters ['n_samples' ]
69
+ self .n_rows = parameters ['n_rows' ]
70
+ self .n_cols = parameters ['n_cols' ]
71
+ self .n_slices = parameters ['n_slices' ]
72
+ self .n_total = self .n_rows * self .n_cols * self .n_slices
73
+
74
+ if hrf != None :
75
+ self .l_hrf = hrf .shape [0 ]
76
+ if hrf .ndim > 2 :
77
+ hrf = np .reshape (hrf , (self .l_hrf , self .n_total ))
78
+ self .hrf_fft = fft (np .vstack ((hrf ,
79
+ np .zeros ((self .n_samples ,
80
+ self .n_total )))),
81
+ axis = 0 )
82
+ else :
83
+ self .hrf_fft = fft (np .append (hrf ,
84
+ np .zeros (self .n_samples )),
85
+ axis = 0 )
86
+ else :
87
+ self .l_hrf = int (32 * self .f_sampling )
88
+ timepoints = np .arange (0 ,
89
+ self .p_sampling * (self .n_samples +
90
+ self .l_hrf ) - 1 ,
91
+ self .p_sampling )
92
+ self .hrf_fft = fft (two_gamma (timepoints ), axis = 0 )
93
+
94
+
95
+ def get_hrf (self ):
96
+ '''
97
+ Returns
98
+ -------
99
+ hrf : floating point array
100
+ hemodynamic response function(s) used by the class
101
+ '''
102
+ if self .hrf_fft .ndim > 1 :
103
+ hrf = ifft (np .zqueeze (
104
+ np .reshape (self .hrf ,
105
+ (self .l_hrf ,
106
+ self .n_rows ,
107
+ self .n_cols ,
108
+ self .n_slices ))),
109
+ axis = 0 )[0 :self .l_hrf , :]
110
+ else :
111
+ hrf = ifft (self .hrf_fft , axis = 0 )[0 :self .l_hrf ]
112
+
113
+ return hrf
114
+
115
+ def get_stimulus (self ):
116
+ '''
117
+ Returns
118
+ -------
119
+ floating point array (1D)
120
+ stimulus used by the class
121
+ '''
122
+
123
+ return self .stimulus
124
+
125
+ def get_timecourses (self ):
126
+ '''
127
+ Returns
128
+ -------
129
+ floating point array (time-by-gridsize)
130
+ predicted timecourses
131
+ '''
132
+
133
+ return ifft (self .tc_fft , axis = 0 )[0 :self .n_samples , :]
134
+
135
+ def set_hrf (self , hrf ):
136
+ '''
137
+ Parameters
138
+ ----------
139
+ hrf : floating point array
140
+ hemodynamic response function
141
+ '''
142
+ self .l_hrf = hrf .shape [0 ]
143
+ if hrf .ndim > 2 :
144
+ hrf = np .reshape (hrf , (self .l_hrf , self .n_total ))
145
+ self .hrf_fft = fft (np .vstack ((hrf ,
146
+ np .zeros ((self .n_samples ,
147
+ self .n_total )))),
148
+ axis = 0 )
149
+ else :
150
+ self .hrf_fft = fft (np .append (hrf ,
151
+ np .zeros (self .n_samples )),
152
+ axis = 0 )
153
+
154
+ def set_stimulus (self , stimulus ):
155
+ '''
156
+
157
+ Parameters
158
+ ----------
159
+ stimulus : floating point array (1D)
160
+ stimulus used by the class
161
+
162
+ '''
163
+ self .stimulus = stimulus
164
+
165
+ def create_timecourses (self , FUN , xdata ):
166
+ '''
167
+ creates predicted timecourses based on the stimulus protocol
168
+ and a range of parameters for an input referred model.
169
+
170
+ Required inputs are
171
+ - FUN : function handle
172
+ defining the input referred model
173
+ - xdata: dictionary with p elements (p = number of parameters).
174
+ Each cell element needs to contain a column vector
175
+ of variable length with parameter values to be explored.
176
+ '''
177
+
178
+ self .xdata = xdata
179
+ self .n_predictors = len (xdata )
180
+ n_observations = np .zeros (self .n_predictors )
181
+ for idx , key in enumerate (self .xdata ):
182
+ n_observations [idx ] = np .size (self .xdata [key ])
183
+
184
+ self .n_points = np .prod (n_observations )
185
+
186
+ idx_all = np .arange (self .n_points )
187
+ self .idx = np .zeros ((self .n_points , self .n_predictors ))
188
+
189
+ for p in range (self .n_predictors ):
190
+ self .idx [:, p ] = (idx_all // (np .prod (n_observations [p + 1 ::]))) \
191
+ % n_observations [p ]
192
+
193
+ tc = np .zeros ((self .n_samples + self .l_hrf ,
194
+ self .n_points ))
195
+ x = np .zeros (self .n_predictors )
196
+ for j in range (self .n_points ):
197
+ for p , key in enumerate (self .n_predictors ):
198
+ x [p ] = self .xdata [key ](self .idx [j , p ])
199
+ tc [0 :self .n_samples , j ] = FUN (self .stimulus , x )
200
+ i = int (j / self .n_points * 21 )
201
+ sys .stdout .write ('\r ' )
202
+ sys .stdout .write ("creating timecourses [%-20s] %d%%"
203
+ % ('=' * i , 5 * i ))
204
+
205
+ self .tc_fft = fft (tc , axis = 0 )
206
+
207
+ def mapping (self , data , threshold = 100 , mask = None ):
208
+ '''
209
+ identifies the best fitting timecourse for each voxel and
210
+ returns a dictionary with keyes corresponding to the
211
+ parameters specified in xdata plus a key 'corr_fit'
212
+ storing the fitness of the solution.
213
+
214
+ Required inputs are
215
+ - data : floating point array
216
+ empirically observed BOLD timecourses
217
+ whose rows correspond to time (volumes).
218
+
219
+
220
+ Optional inputs are
221
+ - threshold: float
222
+ minimum voxel intensity (default = 100.0)
223
+ - mask: boolean array
224
+ binary mask for selecting voxels (default = None)
225
+ '''
226
+ data = np .reshape (data .astype (float ),
227
+ (self .n_samples ,
228
+ self .n_total ))
229
+
230
+ mean_signal = np .mean (data , axis = 0 )
231
+ data = zscore (data , axis = 0 )
232
+
233
+ if mask == None :
234
+ mask = mean_signal >= threshold
235
+
236
+ mask = np .reshape (mask ,self .n_total )
237
+ voxel_index = np .where (mask )[0 ]
238
+ n_voxels = voxel_index .size
239
+
240
+ mag_d = np .sqrt (np .sum (data [:, mask ]** 2 , axis = 0 ))
241
+
242
+
243
+ results = {'corr_fit' : np .zeros (self .n_total )}
244
+
245
+ if self .hrf_fft .ndim == 1 :
246
+ tc = np .transpose (
247
+ zscore (
248
+ np .abs (
249
+ ifft (self .tc_fft *
250
+ np .expand_dims (self .hrf_fft ,
251
+ axis = 1 ), axis = 0 )), axis = 0 ))
252
+ tc = tc [:, 0 :self .n_samples ]
253
+ mag_tc = np .sqrt (np .sum (tc ** 2 , axis = 1 ))
254
+ for m in range (n_voxels ):
255
+ v = voxel_index [m ]
256
+
257
+ CS = np .matmul (tc , data [:, v ]) / (mag_tc * mag_d [m ])
258
+ idx_remove = (CS == np .Inf )| (CS == np .NaN );
259
+ CS [idx_remove ] = 0
260
+
261
+ results ['corr_fit' ][v ] = np .max (CS )
262
+ idx_best = np .argmax (CS )
263
+
264
+ for pos , key in enumerate (self .xdata ):
265
+ results [key ][v ] = self .xdata [key ][self .idx [idx_best , pos ]]
266
+
267
+ i = int (m / n_voxels * 21 )
268
+ sys .stdout .write ('\r ' )
269
+ sys .stdout .write ("irm mapping [%-20s] %d%%"
270
+ % ('=' * i , 5 * i ))
271
+
272
+ else :
273
+ for m in range (n_voxels ):
274
+ v = voxel_index [m ]
275
+
276
+ tc = np .transpose (
277
+ zscore (
278
+ np .abs (
279
+ ifft (self .tc_fft *
280
+ np .expand_dims (self .hrf_fft [:, v ],
281
+ axis = 1 ), axis = 0 )), axis = 0 ))
282
+
283
+ tc = tc [:, 0 :self .n_samples ]
284
+ mag_tc = np .sqrt (np .sum (tc ** 2 , axis = 1 ))
285
+
286
+ CS = np .matmul (tc , data [:, v ]) / (mag_tc * mag_d [m ])
287
+ idx_remove = (CS == np .Inf )| (CS == np .NaN );
288
+ CS [idx_remove ] = 0
289
+
290
+ results ['corr_fit' ][v ] = np .max (CS )
291
+ idx_best = np .argmax (CS )
292
+
293
+ for pos , key in enumerate (self .xdata ):
294
+ results [key ][v ] = self .xdata [key ][self .idx [idx_best , pos ]]
295
+
296
+ i = int (m / n_voxels * 21 )
297
+ sys .stdout .write ('\r ' )
298
+ sys .stdout .write ("irm mapping [%-20s] %d%%"
299
+ % ('=' * i , 5 * i ))
300
+
301
+
302
+ for key in results :
303
+ results [key ] = np .squeeze (
304
+ np .reshape (results [key ],
305
+ (self .n_rows ,
306
+ self .n_cols ,
307
+ self .n_slices )))
308
+
309
+ return results
0 commit comments