1
+ '''
2
+ -----------------------------------------------------------------------------
3
+ LICENSE
4
+
5
+ Copyright 2020 Mario Senden
6
+
7
+ This program is free software: you can redistribute it and/or modify
8
+ it under the terms of the GNU Lesser General Public License as published
9
+ by the Free Software Foundation, either version 3 of the License, or
10
+ at your option) any later version.
11
+
12
+ This program is distributed in the hope that it will be useful,
13
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
14
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
+ GNU Lesser General Public License for more details.
16
+
17
+ You should have received a copy of the GNU Lesser General Public License
18
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
19
+ '''
20
+
21
+ import sys
22
+ import numpy as np
23
+ from scipy .linalg import inv
24
+ from scipy .stats import zscore
25
+ from scipy .fft import fft , ifft
26
+ from cni_toolbox .gadgets import two_gamma , regress
27
+
28
+ class RRT :
29
+ '''
30
+ Ridge regression tool.
31
+
32
+ rrt = RRT(parameters) creates an instance of the RRT class.
33
+ parameters is a dictionary with 5 required keys
34
+ - f_sampling: sampling frequency (1/TR)
35
+ - n_samples : number of samples (volumes)
36
+ - n_rows : number of rows (in-plane resolution)
37
+ - n_cols : number of columns (in-plance resolution)
38
+ - n_slices : number of slices
39
+
40
+ optional inputs are
41
+ - hrf : either a column vector containing a single hemodynamic
42
+ response used for every voxel;
43
+ or a matrix with a unique hemodynamic response along
44
+ its columns for each voxel.
45
+ By default the canonical two-gamma hemodynamic response
46
+ function is generated internally based on the scan parameters.
47
+
48
+ This class has the following functions
49
+ %
50
+ - hrf = RRT.get_hrf()
51
+ - X = RRT.get_design()
52
+ - RRT.set_hrf(hrf)
53
+ - RRT.set_design(X)
54
+ - RRT.optimize_lambda(data,range)
55
+ - results = RRT.perform_ridge(data)
56
+
57
+ Typical workflow:
58
+ 1. rrt = RRT(params);
59
+ 2. rrt.set_design(X);
60
+ 3. rrt.optimize_lambda(data,range);
61
+ 4. results = rrt.perform_ridge(data);
62
+ '''
63
+
64
+ def __init__ (self , parameters , hrf = None ):
65
+ self .f_sampling = parameters ['f_sampling' ]
66
+ self .p_sampling = 1 / self .f_sampling
67
+ self .n_samples = parameters ['n_samples' ]
68
+ self .n_rows = parameters ['n_rows' ]
69
+ self .n_cols = parameters ['n_cols' ]
70
+ self .n_slices = parameters ['n_slices' ]
71
+ self .n_total = self .n_rows * self .n_cols * self .n_slices
72
+
73
+ if hrf != None :
74
+ self .l_hrf = hrf .shape [0 ]
75
+ if hrf .ndim > 2 :
76
+ hrf = np .reshape (hrf , (self .l_hrf , self .n_total ))
77
+ self .hrf_fft = fft (np .vstack ((hrf ,
78
+ np .zeros ((self .n_samples ,
79
+ self .n_total )))),
80
+ axis = 0 )
81
+ else :
82
+ self .hrf_fft = fft (np .append (hrf ,
83
+ np .zeros (self .n_samples )),
84
+ axis = 0 )
85
+ else :
86
+ self .l_hrf = int (32 * self .f_sampling )
87
+ timepoints = np .arange (0 ,
88
+ self .p_sampling * (self .n_samples +
89
+ self .l_hrf ) - 1 ,
90
+ self .p_sampling )
91
+ self .hrf_fft = fft (two_gamma (timepoints ), axis = 0 )
92
+
93
+
94
+ def get_hrf (self ):
95
+ '''
96
+ Returns
97
+ -------
98
+ hrf : floating point array
99
+ hemodynamic response function(s) used by the class
100
+ '''
101
+ if self .hrf_fft .ndim > 1 :
102
+ hrf = ifft (np .zqueeze (
103
+ np .reshape (self .hrf ,
104
+ (self .l_hrf ,
105
+ self .n_rows ,
106
+ self .n_cols ,
107
+ self .n_slices ))),
108
+ axis = 0 )[0 :self .l_hrf , :]
109
+ else :
110
+ hrf = ifft (self .hrf_fft , axis = 0 )[0 :self .l_hrf ]
111
+
112
+ return hrf
113
+
114
+
115
+ def get_design (self ):
116
+ '''
117
+ Returns
118
+ -------
119
+ floating point array
120
+ design matrix used by the class
121
+
122
+ '''
123
+
124
+ return self .X [0 :self .n_samples , :]
125
+
126
+ def set_hrf (self , hrf ):
127
+ '''
128
+ Parameters
129
+ ----------
130
+ hrf : floating point array
131
+ hemodynamic response function
132
+ '''
133
+ self .l_hrf = hrf .shape [0 ]
134
+ if hrf .ndim > 2 :
135
+ hrf = np .reshape (hrf , (self .l_hrf , self .n_total ))
136
+ self .hrf_fft = fft (np .vstack ((hrf ,
137
+ np .zeros ((self .n_samples ,
138
+ self .n_total )))),
139
+ axis = 0 )
140
+ else :
141
+ self .hrf_fft = fft (np .append (hrf ,
142
+ np .zeros (self .n_samples )),
143
+ axis = 0 )
144
+
145
+ def set_design (self , X , convolve = False ):
146
+ '''
147
+ provide a n-by-p design matrix to the class with n samples
148
+ and p predictors.
149
+
150
+ optional inputs is
151
+ - convolve: boolean
152
+ indicates whether the design matrix needs to be convolved
153
+ with the hemodynamic response function (default = False)
154
+ '''
155
+
156
+ if X .ndim > 1 :
157
+ self .n_predictors = X .shape [1 ]
158
+ else :
159
+ self .n_predictors = 1
160
+
161
+ if convolve :
162
+ X_fft = fft (np .vstack ((X ,
163
+ np .zeros ((self .l_hrf , self .n_predictors )))),
164
+ axis = 0 )
165
+ X = np .abs (ifft (X_fft *
166
+ np .expand_dims (self .hrf_fft ,
167
+ axis = 1 ), axis = 0 ))
168
+
169
+ self .X = zscore (X [0 :self .n_samples ,], axis = 0 )
170
+
171
+ def optimize_penalty (self , data , candidates , folds = 4 ,
172
+ mask = None ):
173
+ '''
174
+ performs k-fold cross-validation to find an optimal value
175
+ for the penalty parameter.
176
+
177
+ required inputs are
178
+ - data: floating point array
179
+ empirically observed BOLD timecourses
180
+ whose rows correspond to time (volumes).
181
+ - candidates: foating point array
182
+ candidate values for the penalty parameter
183
+
184
+ optional inputs are
185
+ - folds: integer
186
+ number of folds (k)
187
+ - mask: boolean array
188
+ binary mask for selecting voxels (default = None)
189
+
190
+ '''
191
+
192
+ if mask .all ()== None :
193
+ mask = np .ones (self .n_total ).astype (bool )
194
+ else :
195
+ mask = np .reshape (mask ,self .n_total )
196
+
197
+ data = np .reshape (data .astype (float ),
198
+ (self .n_samples ,
199
+ self .n_total ))
200
+ data = zscore (data [:, mask ], axis = 0 )
201
+
202
+ fit = np .zeros (candidates .size )
203
+
204
+ s_sample = self .n_samples // folds
205
+
206
+ for i in range (candidates .size ):
207
+ for k in range (folds ):
208
+
209
+ n_sub = k * s_sample + s_sample
210
+ trn_X = self .X [:n_sub , :]
211
+ trn_data = data [:n_sub , :]
212
+
213
+ tst_X = self .X [n_sub ::, :]
214
+ tst_data = data [n_sub ::, :]
215
+
216
+ beta = regress (trn_data , trn_X , candidates [i ])
217
+
218
+ Y = np .matmul (tst_X , beta )
219
+
220
+ mag_d = np .sqrt (np .sum (tst_data ** 2 , axis = 0 ))
221
+ mag_y = np .sqrt (np .sum (Y ** 2 , axis = 0 ))
222
+
223
+ corr_fit = np .mean (np .sum (Y * tst_data , axis = 0 ) / (mag_y * mag_d ))
224
+ fit [i ] += (corr_fit - fit [i ]) / (k + 1 )
225
+
226
+ idx = np .argmax (fit )
227
+
228
+ self .penalty = candidates [idx ]
229
+
230
+
231
+ def perform_ridge (self , data , mask = None , penalty = None ):
232
+ '''
233
+ performs ridge regression and returns a dictionary
234
+ with the following keys
235
+ - corr_fit:
236
+ - beta:
237
+
238
+ The dimension of corr_fit corresponds to the dimensions of
239
+ the data. beta has an additional dimension with num_predictors
240
+ elements.
241
+ %
242
+ required input is
243
+ - data : floating point array
244
+ empirically observed BOLD timecourses
245
+ whose rows correspond to time (volumes).
246
+
247
+ optional inputs are
248
+ - lambda: float
249
+ penalty parameter
250
+ - mask: boolean array
251
+ binary mask for selecting voxels (default = None)
252
+ '''
253
+
254
+ results = {'corr_fit' : np .zeros (self .n_total ),
255
+ 'beta' : np .zeros ((self .n_samples , self .n_predictors ))}
256
+ if penalty == None :
257
+ penalty = self .penalty
258
+
259
+ if mask .all ()== None :
260
+ mask = np .ones (self .n_total ).astype (bool )
261
+ else :
262
+ mask = np .reshape (mask ,self .n_total )
263
+
264
+ data = np .reshape (data .astype (float ),
265
+ (self .n_samples ,
266
+ self .n_total ))
267
+ data = zscore (data [:, mask ], axis = 0 )
268
+
269
+ mask = np .reshape (mask ,self .n_total )
270
+
271
+ beta = regress (data , self .X , penalty )
272
+ Y = np .matmul (self .X , beta )
273
+
274
+ mag_d = np .sqrt (np .sum (data ** 2 , axis = 0 ))
275
+ mag_y = np .sqrt (np .sum (Y ** 2 , axis = 0 ))
276
+
277
+ corr_fit = np .sum (Y * data , axis = 0 ) / (mag_y * mag_d )
278
+
279
+ results ['corr_fit' ][mask ] = corr_fit
280
+ results ['beta' ][mask , :] = beta
281
+
282
+ results ['corr_fit' ] = np .squeeze (
283
+ np .reshape (
284
+ results ['corr_fit' ],
285
+ (self .n_rows ,
286
+ self .n_cols ,
287
+ self .n_slices )))
288
+
289
+ results ['beta' ] = np .squeeze (
290
+ np .reshape (results ['beta' ],
291
+ (self .n_rows ,
292
+ self .n_cols ,
293
+ self .n_slices ,
294
+ self .n_predictors )))
295
+
296
+ return results
297
+
0 commit comments