|
| 1 | +# Authors: Alex Rockhill <aprockhill@mailbox.org> |
| 2 | +# |
| 3 | +# License: BSD-3-Clause |
| 4 | + |
| 5 | +import numpy as np |
| 6 | + |
| 7 | +from ...channels import DigMontage, make_dig_montage |
| 8 | +from ...surface import _voxel_neighbors |
| 9 | +from ...transforms import apply_trans, _frame_to_str, Transform |
| 10 | +from ...utils import (verbose, warn, _pl, _validate_type, _require_version, |
| 11 | + _check_option) |
| 12 | + |
| 13 | + |
| 14 | +@verbose |
| 15 | +def warp_montage(montage, moving, static, reg_affine, sdr_morph, verbose=None): |
| 16 | + """Warp a montage to a template with image volumes using SDR. |
| 17 | +
|
| 18 | + .. note:: This is likely only applicable for channels inside the brain |
| 19 | + (intracranial electrodes). |
| 20 | +
|
| 21 | + Parameters |
| 22 | + ---------- |
| 23 | + montage : instance of mne.channels.DigMontage |
| 24 | + The montage object containing the channels. |
| 25 | + %(moving)s |
| 26 | + %(static)s |
| 27 | + %(reg_affine)s |
| 28 | + %(sdr_morph)s |
| 29 | + %(verbose)s |
| 30 | +
|
| 31 | + Returns |
| 32 | + ------- |
| 33 | + montage_warped : mne.channels.DigMontage |
| 34 | + The modified montage object containing the channels. |
| 35 | + """ |
| 36 | + _require_version('nibabel', 'warp montage', '2.1.0') |
| 37 | + _require_version('dipy', 'warping points using SDR', '1.6.0') |
| 38 | + |
| 39 | + from nibabel import MGHImage |
| 40 | + from nibabel.spatialimages import SpatialImage |
| 41 | + from dipy.align.imwarp import DiffeomorphicMap |
| 42 | + |
| 43 | + _validate_type(moving, SpatialImage, 'moving') |
| 44 | + _validate_type(static, SpatialImage, 'static') |
| 45 | + _validate_type(reg_affine, np.ndarray, 'reg_affine') |
| 46 | + _check_option('reg_affine.shape', reg_affine.shape, ((4, 4),)) |
| 47 | + _validate_type(sdr_morph, (DiffeomorphicMap, None), 'sdr_morph') |
| 48 | + _validate_type(montage, DigMontage, 'montage') |
| 49 | + |
| 50 | + moving_mgh = MGHImage(np.array(moving.dataobj).astype(np.float32), |
| 51 | + moving.affine) |
| 52 | + static_mgh = MGHImage(np.array(static.dataobj).astype(np.float32), |
| 53 | + static.affine) |
| 54 | + del moving, static |
| 55 | + |
| 56 | + # get montage channel coordinates |
| 57 | + ch_dict = montage.get_positions() |
| 58 | + if ch_dict['coord_frame'] != 'mri': |
| 59 | + bad_coord_frames = np.unique([d['coord_frame'] for d in montage.dig]) |
| 60 | + bad_coord_frames = ', '.join([ |
| 61 | + _frame_to_str[cf] if cf in _frame_to_str else str(cf) |
| 62 | + for cf in bad_coord_frames]) |
| 63 | + raise RuntimeError('Coordinate frame not supported, expected ' |
| 64 | + f'"mri", got {bad_coord_frames}') |
| 65 | + ch_names = list(ch_dict['ch_pos'].keys()) |
| 66 | + ch_coords = np.array([ch_dict['ch_pos'][name] for name in ch_names]) |
| 67 | + |
| 68 | + ch_coords = apply_trans( # convert to moving voxel space |
| 69 | + np.linalg.inv(moving_mgh.header.get_vox2ras_tkr()), ch_coords * 1000) |
| 70 | + # next, to moving scanner RAS |
| 71 | + ch_coords = apply_trans(moving_mgh.header.get_vox2ras(), ch_coords) |
| 72 | + |
| 73 | + # now, apply reg_affine |
| 74 | + ch_coords = apply_trans(Transform( # to static ras |
| 75 | + fro='ras', to='ras', trans=np.linalg.inv(reg_affine)), ch_coords) |
| 76 | + |
| 77 | + # now, apply SDR morph |
| 78 | + if sdr_morph is not None: |
| 79 | + ch_coords = sdr_morph.transform_points( |
| 80 | + ch_coords, sdr_morph.domain_grid2world, |
| 81 | + sdr_morph.domain_world2grid) |
| 82 | + |
| 83 | + # back to voxels but now for the static image |
| 84 | + ch_coords = apply_trans(np.linalg.inv(static_mgh.header.get_vox2ras()), |
| 85 | + ch_coords) |
| 86 | + |
| 87 | + # finally, back to surface RAS |
| 88 | + ch_coords = apply_trans(static_mgh.header.get_vox2ras_tkr(), |
| 89 | + ch_coords) / 1000 |
| 90 | + |
| 91 | + # make warped montage |
| 92 | + montage_warped = make_dig_montage( |
| 93 | + dict(zip(ch_names, ch_coords)), coord_frame='mri') |
| 94 | + return montage_warped |
| 95 | + |
| 96 | + |
| 97 | +def _warn_missing_chs(info, dig_image, after_warp=False): |
| 98 | + """Warn that channels are missing.""" |
| 99 | + # ensure that each electrode contact was marked in at least one voxel |
| 100 | + missing = set(np.arange(1, len(info.ch_names) + 1)).difference( |
| 101 | + set(np.unique(np.array(dig_image.dataobj)))) |
| 102 | + missing_ch = [info.ch_names[idx - 1] for idx in missing] |
| 103 | + if missing_ch: |
| 104 | + warn(f'Channel{_pl(missing_ch)} ' |
| 105 | + f'{", ".join(repr(ch) for ch in missing_ch)} not assigned ' |
| 106 | + 'voxels ' + |
| 107 | + (f' after applying {after_warp}' if after_warp else '')) |
| 108 | + |
| 109 | + |
| 110 | +@verbose |
| 111 | +def make_montage_volume(montage, base_image, thresh=0.5, max_peak_dist=1, |
| 112 | + voxels_max=100, use_min=False, verbose=None): |
| 113 | + """Make a volume from intracranial electrode contact locations. |
| 114 | +
|
| 115 | + Find areas of the input volume with intensity greater than |
| 116 | + a threshold surrounding local extrema near the channel location. |
| 117 | + Monotonicity from the peak is enforced to prevent channels |
| 118 | + bleeding into each other. |
| 119 | +
|
| 120 | + Parameters |
| 121 | + ---------- |
| 122 | + montage : instance of mne.channels.DigMontage |
| 123 | + The montage object containing the channels. |
| 124 | + base_image : path-like | nibabel.spatialimages.SpatialImage |
| 125 | + Path to a volumetric scan (e.g. CT) of the subject. Can be in any |
| 126 | + format readable by nibabel. Can also be a nibabel image object. |
| 127 | + Local extrema (max or min) should be nearby montage channel locations. |
| 128 | + thresh : float |
| 129 | + The threshold relative to the peak to determine the size |
| 130 | + of the sensors on the volume. |
| 131 | + max_peak_dist : int |
| 132 | + The number of voxels away from the channel location to |
| 133 | + look in the ``image``. This will depend on the accuracy of |
| 134 | + the channel locations, the default (one voxel in all directions) |
| 135 | + will work only with localizations that are that accurate. |
| 136 | + voxels_max : int |
| 137 | + The maximum number of voxels for each channel. |
| 138 | + use_min : bool |
| 139 | + Whether to hypointensities in the volume as channel locations. |
| 140 | + Default False uses hyperintensities. |
| 141 | + %(verbose)s |
| 142 | +
|
| 143 | + Returns |
| 144 | + ------- |
| 145 | + elec_image : nibabel.spatialimages.SpatialImage |
| 146 | + An image in Freesurfer surface RAS space with voxel values |
| 147 | + corresponding to the index of the channel. The background |
| 148 | + is 0s and this index starts at 1. |
| 149 | + """ |
| 150 | + _require_version('nibabel', 'montage volume', '2.1.0') |
| 151 | + import nibabel as nib |
| 152 | + |
| 153 | + _validate_type(montage, DigMontage, 'montage') |
| 154 | + _validate_type(base_image, nib.spatialimages.SpatialImage, 'base_image') |
| 155 | + _validate_type(thresh, float, 'thresh') |
| 156 | + if thresh < 0 or thresh >= 1: |
| 157 | + raise ValueError(f'`thresh` must be between 0 and 1, got {thresh}') |
| 158 | + _validate_type(max_peak_dist, int, 'max_peak_dist') |
| 159 | + _validate_type(voxels_max, int, 'voxels_max') |
| 160 | + _validate_type(use_min, bool, 'use_min') |
| 161 | + |
| 162 | + # load image and make sure it's in surface RAS |
| 163 | + if not isinstance(base_image, nib.spatialimages.SpatialImage): |
| 164 | + base_image = nib.load(base_image) |
| 165 | + |
| 166 | + base_image_mgh = nib.MGHImage( |
| 167 | + np.array(base_image.dataobj).astype(np.float32), base_image.affine) |
| 168 | + del base_image |
| 169 | + |
| 170 | + # get montage channel coordinates |
| 171 | + ch_dict = montage.get_positions() |
| 172 | + if ch_dict['coord_frame'] != 'mri': |
| 173 | + bad_coord_frames = np.unique([d['coord_frame'] for d in montage.dig]) |
| 174 | + bad_coord_frames = ', '.join([ |
| 175 | + _frame_to_str[cf] if cf in _frame_to_str else str(cf) |
| 176 | + for cf in bad_coord_frames]) |
| 177 | + raise RuntimeError('Coordinate frame not supported, expected ' |
| 178 | + f'"mri", got {bad_coord_frames}') |
| 179 | + |
| 180 | + ch_names = list(ch_dict['ch_pos'].keys()) |
| 181 | + ch_coords = np.array([ch_dict['ch_pos'][name] for name in ch_names]) |
| 182 | + |
| 183 | + # convert to voxel space |
| 184 | + ch_coords = apply_trans( |
| 185 | + np.linalg.inv(base_image_mgh.header.get_vox2ras_tkr()), |
| 186 | + ch_coords * 1000) |
| 187 | + |
| 188 | + # take channel coordinates and use the image to transform them |
| 189 | + # into a volume where all the voxels over a threshold nearby |
| 190 | + # are labeled with an index |
| 191 | + image_data = np.array(base_image_mgh.dataobj) |
| 192 | + if use_min: |
| 193 | + image_data *= -1 |
| 194 | + elec_image = np.zeros(base_image_mgh.shape, dtype=int) |
| 195 | + for i, ch_coord in enumerate(ch_coords): |
| 196 | + if np.isnan(ch_coord).any(): |
| 197 | + continue |
| 198 | + # this looks up to a voxel away, it may be marked imperfectly |
| 199 | + volume = _voxel_neighbors(ch_coord, image_data, thresh=thresh, |
| 200 | + max_peak_dist=max_peak_dist, |
| 201 | + voxels_max=voxels_max) |
| 202 | + for voxel in volume: |
| 203 | + if elec_image[voxel] != 0: |
| 204 | + # some voxels ambiguous because the contacts are bridged on |
| 205 | + # the image so assign the voxel to the nearest contact location |
| 206 | + dist_old = np.sqrt( |
| 207 | + (ch_coords[elec_image[voxel] - 1] - voxel)**2).sum() |
| 208 | + dist_new = np.sqrt((ch_coord - voxel)**2).sum() |
| 209 | + if dist_new < dist_old: |
| 210 | + elec_image[voxel] = i + 1 |
| 211 | + else: |
| 212 | + elec_image[voxel] = i + 1 |
| 213 | + |
| 214 | + # assemble the volume |
| 215 | + elec_image = nib.spatialimages.SpatialImage( |
| 216 | + elec_image, base_image_mgh.affine) |
| 217 | + _warn_missing_chs(montage, elec_image, after_warp=False) |
| 218 | + |
| 219 | + return elec_image |
0 commit comments