Skip to content

feat: Implement flexible ArrayLike input handling for arrays, lists, and tuples #73

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions src/osipi/_aif.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import numpy as np
from numpy.typing import NDArray
from numpy.typing import ArrayLike, NDArray


def aif_parker(
t: NDArray[np.floating], BAT: np.floating = 0.0, Hct: np.floating = 0.0
t: ArrayLike, BAT: np.floating = 0.0, Hct: np.floating = 0.0
) -> NDArray[np.floating]:
"""AIF model as defined by Parker et al (2005)

Args:
t (NDArray[np.floating]): array of time points in units of sec. [OSIPI code Q.GE1.004]
t (ArrayLike): array of time points in units of sec. [OSIPI code Q.GE1.004]
BAT (np.floating, optional):
Time in seconds before the bolus arrives. Defaults to 0. [OSIPI code Q.BA1.001]
Hct (np.floating, optional):
Expand Down Expand Up @@ -45,8 +45,14 @@ def aif_parker(
>>> plt.show()

"""
# Convert input to numpy array with appropriate dtype
t_arr = np.asarray(t)
# Ensure floating-point dtype
if not np.issubdtype(t_arr.dtype, np.floating):
t_arr = t_arr.astype(np.float64)

# Convert from OSIPI units (sec) to units used internally (mins)
t_min = t / 60
t_min = t_arr / 60
bat_min = BAT / 60

t_offset = t_min - bat_min
Expand All @@ -71,7 +77,7 @@ def aif_parker(
return pop_aif


def aif_georgiou(t: NDArray[np.floating], BAT: np.floating = 0.0) -> NDArray[np.floating]:
def aif_georgiou(t: ArrayLike, BAT: np.floating = 0.0) -> NDArray[np.floating]:
"""AIF model as defined by Georgiou et al.

Note:
Expand All @@ -80,7 +86,7 @@ def aif_georgiou(t: NDArray[np.floating], BAT: np.floating = 0.0) -> NDArray[np.
so nobody ever has to write this function again!

Args:
t (NDArray[np.floating]): array of time points in units of sec. [OSIPI code Q.GE1.004]
t (ArrayLike): array of time points in units of sec. [OSIPI code Q.GE1.004]
BAT (np.floating, optional):
Time in seconds before the bolus arrives. Defaults to 0sec. [OSIPI code Q.BA1.001]

Expand Down Expand Up @@ -119,13 +125,12 @@ def aif_georgiou(t: NDArray[np.floating], BAT: np.floating = 0.0) -> NDArray[np.

msg = "This function is not yet implemented \n"
msg += (
"If you implement it yourself, please consider submitting it"
" as an OSIPI code contribution"
"If you implement it yourself, please consider submitting it as an OSIPI code contribution"
)
raise NotImplementedError(msg)


def aif_weinmann(t: NDArray[np.floating], BAT: np.floating = 0.0) -> NDArray[np.floating]:
def aif_weinmann(t: ArrayLike, BAT: np.floating = 0.0) -> NDArray[np.floating]:
"""AIF model as defined by Weinmann et al.

Note:
Expand All @@ -134,7 +139,7 @@ def aif_weinmann(t: NDArray[np.floating], BAT: np.floating = 0.0) -> NDArray[np.
so nobody ever has to write this function again!

Args:
t (NDArray[np.floating]): array of time points in units of sec. [OSIPI code Q.GE1.004]
t (ArrayLike): array of time points in units of sec. [OSIPI code Q.GE1.004]
BAT (np.floating, optional):
Time in seconds before the bolus arrives. Defaults to 0sec. [OSIPI code Q.BA1.001]

Expand Down Expand Up @@ -172,7 +177,6 @@ def aif_weinmann(t: NDArray[np.floating], BAT: np.floating = 0.0) -> NDArray[np.

msg = "This function is not yet implemented \n"
msg += (
"If you implement it yourself, please consider submitting it"
" as an OSIPI code contribution"
"If you implement it yourself, please consider submitting it as an OSIPI code contribution"
)
raise NotImplementedError(msg)
32 changes: 20 additions & 12 deletions src/osipi/_convolution.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,42 @@
import numpy as np
from numpy.typing import NDArray
from numpy.typing import ArrayLike, NDArray


def exp_conv(
T: np.floating, t: NDArray[np.floating], a: NDArray[np.floating]
) -> NDArray[np.floating]:
def exp_conv(T: np.floating, t: ArrayLike, a: ArrayLike) -> NDArray[np.floating]:
"""Exponential convolution operation of (1/T)exp(-t/T) with a.

Args:
T (np.floating): exponent in time units
t (NDArray[np.floating]): array of time points
a (NDArray[np.floating]): array to be convolved with time exponential
t (ArrayLike): array of time points
a (ArrayLike): array to be convolved with time exponential

Returns:
NDArray[np.floating]: convolved array
"""
# Convert inputs to numpy arrays
t_arr = np.asarray(t)
a_arr = np.asarray(a)

# Ensure floating-point dtype while preserving original precision
if not np.issubdtype(t_arr.dtype, np.floating):
t_arr = t_arr.astype(np.float64)
if not np.issubdtype(a_arr.dtype, np.floating):
a_arr = a_arr.astype(np.float64)

if T == 0:
return a
return a_arr

n = len(t)
f = np.zeros((n,))
n = len(t_arr)
f = np.zeros((n,), dtype=t_arr.dtype) # Use same dtype as t_arr

x = (t[1 : n - 1] - t[0 : n - 2]) / T
da = (a[1 : n - 1] - a[0 : n - 2]) / x
x = (t_arr[1 : n - 1] - t_arr[0 : n - 2]) / T
da = (a_arr[1 : n - 1] - a_arr[0 : n - 2]) / x

E = np.exp(-x)
E0 = 1 - E
E1 = x - E0

add = a[0 : n - 2] * E0 + da * E1
add = a_arr[0 : n - 2] * E0 + da * E1

for i in range(0, n - 2):
f[i + 1] = E[i] * f[i] + add[i]
Expand Down
20 changes: 13 additions & 7 deletions src/osipi/_electromagnetic_property.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import numpy as np
from numpy.typing import NDArray
from numpy.typing import ArrayLike, NDArray


def R1_to_C_linear_relaxivity(
R1: NDArray[np.floating], R10: np.floating, r1: np.floating
R1: ArrayLike, R10: np.floating, r1: np.floating
) -> NDArray[np.floating]:
"""
Electromagnetic property inverse model:
Expand All @@ -12,7 +12,7 @@ def R1_to_C_linear_relaxivity(
Converts R1 to tissue concentration

Args:
R1 (1D array of np.floating):
R1 (ArrayLike):
Vector of longitudinal relaxation rate in units of /s. [OSIPI code Q.EL1.001]
R10 (np.floating):
Native longitudinal relaxation rate in units of /s. [OSIPI code Q.EL1.002]
Expand All @@ -32,9 +32,15 @@ def R1_to_C_linear_relaxivity(
longitudinal relaxation rate, linear with relaxivity model [OSIPI code M.EL1.003]
- Adapted from equation given in lexicon
"""
# Check R1 is a 1D array of floats
if not (isinstance(R1, np.ndarray) and R1.ndim == 1 and np.issubdtype(R1.dtype, np.floating)):
raise TypeError("R1 must be a 1D NumPy array of np.floating")
# Convert input to numpy array with appropriate dtype
R1_arr = np.asarray(R1)
# Ensure floating-point dtype
if not np.issubdtype(R1_arr.dtype, np.floating):
R1_arr = R1_arr.astype(np.float64)

# Check R1 is a 1D array
if not (R1_arr.ndim == 1):
raise TypeError("R1 must be a 1D array-like object of floating point values")
elif not (r1 >= 0):
raise ValueError("r1 must be positive")
return (R1 - R10) / r1 # C
return (R1_arr - R10) / r1 # C
37 changes: 26 additions & 11 deletions src/osipi/_signal.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import numpy as np
from numpy.typing import NDArray
from numpy.typing import ArrayLike, NDArray


# Use a more generic floating-point type annotation
def signal_linear(R1: NDArray[np.floating], k: np.floating) -> NDArray[np.floating]:
def signal_linear(R1: ArrayLike, k: np.floating) -> NDArray[np.floating]:
"""Linear model for relationship between R1 and magnitude signal
Args:
R1 (NDArray[np.floating]): longitudinal relaxation rate in units of /s. [OSIPI code Q.EL1.001]
R1 (ArrayLike): longitudinal relaxation rate in units of /s. [OSIPI code Q.EL1.001]
k (np.floating): proportionality constant in a.u. S [OSIPI code Q.GE1.009]
Returns:
NDArray[np.floating]: magnitude signal in a.u. [OSIPI code Q.MS1.001]
Expand All @@ -16,20 +15,26 @@ def signal_linear(R1: NDArray[np.floating], k: np.floating) -> NDArray[np.floati
- OSIPI name: Linear model
- Adapted from equation given in the Lexicon
"""
# Convert input to numpy array with appropriate dtype
R1_arr = np.asarray(R1)
# Ensure floating-point dtype
if not np.issubdtype(R1_arr.dtype, np.floating):
R1_arr = R1_arr.astype(np.float64)

# calculate signal
return k * R1 # S
return k * R1_arr # S


def signal_SPGR(
R1: NDArray[np.floating],
S0: NDArray[np.floating],
R1: ArrayLike,
S0: ArrayLike,
TR: np.floating,
a: np.floating,
) -> NDArray[np.floating]:
"""Steady-state signal for SPGR sequence.
Args:
R1 (NDArray[np.floating]): longitudinal relaxation rate in units of /s. [OSIPI code Q.EL1.001]
S0 (NDArray[np.floating]): fully T1-relaxed signal in a.u. [OSIPI code Q.MS1.010]
R1 (ArrayLike): longitudinal relaxation rate in units of /s. [OSIPI code Q.EL1.001]
S0 (ArrayLike): fully T1-relaxed signal in a.u. [OSIPI code Q.MS1.010]
TR (np.floating): repetition time in units of s. [OSIPI code Q.MS1.006]
a (np.floating): prescribed flip angle in units of deg. [OSIPI code Q.MS1.007]
Returns:
Expand All @@ -40,7 +45,17 @@ def signal_SPGR(
- OSIPI name: Spoiled gradient recalled echo model
- Adapted from equation given in the Lexicon and contribution from MJT_UoEdinburgh_UK
"""
# Convert inputs to numpy arrays with appropriate dtype
R1_arr = np.asarray(R1)
S0_arr = np.asarray(S0)

# Ensure floating-point dtype
if not np.issubdtype(R1_arr.dtype, np.floating):
R1_arr = R1_arr.astype(np.float64)
if not np.issubdtype(S0_arr.dtype, np.floating):
S0_arr = S0_arr.astype(np.float64)

# calculate signal
a_rad = a * np.pi / 180
exp_TR_R1 = np.exp(-TR * R1)
return S0 * (((1.0 - exp_TR_R1) * np.sin(a_rad)) / (1.0 - exp_TR_R1 * np.cos(a_rad))) # S
exp_TR_R1 = np.exp(-TR * R1_arr)
return S0_arr * (((1.0 - exp_TR_R1) * np.sin(a_rad)) / (1.0 - exp_TR_R1 * np.cos(a_rad))) # S
22 changes: 14 additions & 8 deletions src/osipi/_signal_to_concentration.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import numpy as np
from numpy.typing import NDArray
from numpy.typing import ArrayLike, NDArray

from ._electromagnetic_property import R1_to_C_linear_relaxivity


def S_to_C_via_R1_SPGR(
S: NDArray[np.floating],
S: ArrayLike,
S_baseline: np.floating,
R10: np.floating,
TR: np.floating,
Expand All @@ -19,7 +19,7 @@ def S_to_C_via_R1_SPGR(
Converts S -> R1 -> C

Args:
S (1D array of np.floating): Vector of magnitude signals in a.u. [OSIPI code Q.MS1.001]
S (ArrayLike): Vector of magnitude signals in a.u. [OSIPI code Q.MS1.001]
S_baseline (np.floating): Pre-contrast magnitude signal in a.u. [OSIPI code Q.MS1.001]
R10 (np.floating): Native longitudinal relaxation rate in units of /s. [OSIPI code Q.EL1.002]
TR (np.floating): Repetition time in units of s. [OSIPI code Q.MS1.006]
Expand Down Expand Up @@ -50,7 +50,7 @@ def S_to_C_via_R1_SPGR(


def S_to_R1_SPGR(
S: NDArray[np.floating],
S: ArrayLike,
S_baseline: np.floating,
R10: np.floating,
TR: np.floating,
Expand All @@ -62,7 +62,7 @@ def S_to_R1_SPGR(
Converts Signal to R1

Args:
S (1D array of np.floating): Vector of magnitude signals in a.u. [OSIPI code Q.MS1.001]
S (ArrayLike): Vector of magnitude signals in a.u. [OSIPI code Q.MS1.001]
S_baseline (np.floating): Pre-contrast magnitude signal in a.u. [OSIPI code Q.MS1.001]
R10 (np.floating): Native longitudinal relaxation rate in units of /s. [OSIPI code Q.EL1.002]
TR (np.floating): Repetition time in units of s. [OSIPI code Q.MS1.006]
Expand All @@ -79,14 +79,20 @@ def S_to_R1_SPGR(
- Forward model: Spoiled gradient recalled echo model [OSIPI code M.SM2.002]
- Adapted from contribution of LEK_UoEdinburgh_UK
"""
# Convert input to numpy array with appropriate dtype
S_arr = np.asarray(S)
# Ensure floating-point dtype
if not np.issubdtype(S_arr.dtype, np.floating):
S_arr = S_arr.astype(np.float64)

# Check S is a 1D array of floats
if not (isinstance(S, np.ndarray) and S.ndim == 1 and np.issubdtype(S.dtype, np.floating)):
raise TypeError("S must be a 1D NumPy array of np.floating")
if not (S_arr.ndim == 1):
raise TypeError("S must be a 1D array-like object of floating point values")

a_rad = a * np.pi / 180
# Estimate fully T1-relaxed signal S0 in units of a.u. [OSIPI code Q.MS1.010], then R1
exp_TR_R10 = np.exp(-TR * R10)
sin_a = np.sin(a_rad)
cos_a = np.cos(a_rad)
S0 = S_baseline * (1 - cos_a * exp_TR_R10) / (sin_a * (1 - exp_TR_R10))
return np.log(((S0 * sin_a) - S) / (S0 * sin_a - (S * cos_a))) * (-1 / TR) # R1
return np.log(((S0 * sin_a) - S_arr) / (S0 * sin_a - (S_arr * cos_a))) * (-1 / TR) # R1
Loading
Loading