Skip to content

Enhance _cwt.py by introducing a configurable hop size parameter #804

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 38 additions & 18 deletions pywt/_cwt.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,27 @@

import numpy as np


def next_fast_len(n):
"""Round up size to the nearest power of two.

Given a number of samples `n`, returns the next power of two
following this number to take advantage of FFT speedup.
"""
return 2**ceil(np.log2(n))


def cwt(data, scales, wavelet, sampling_period=1., method='conv', axis=-1):
try:
import scipy
fftmodule = scipy.fft
next_fast_len = fftmodule.next_fast_len
except ImportError:
fftmodule = np.fft

# provide a fallback so scipy is an optional requirement
# note: numpy.fft in numpy 2.0 is as fast as scipy.fft, so could be used
# unconditionally once the minimum supported numpy version is >=2.0
def next_fast_len(n):
"""Round up size to the nearest power of two.

Given a number of samples `n`, returns the next power of two
following this number to take advantage of FFT speedup.
This fallback is less efficient than `scipy.fftpack.next_fast_len`
"""
return 2**ceil(np.log2(n))


def cwt(data, scales, wavelet, hop_size=1, sampling_period=1., method='conv', axis=-1):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please move the new keyword to the end of the signature? It's not backwards-compatible to add a new keyword in the middle. That would break calls like cwt(data, scales, wavelet, 2.5) (2.5 was the sampling period).

Also, please add it as *, hop_size=1 (the * means keyword-only).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The examples and tests then need to be updated for that change.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. New key word hop_size is move to the end, and defined as keyword-only.
  2. The examples and tests are revised accordingly and passed.

"""
cwt(data, scales, wavelet)

Expand Down Expand Up @@ -114,7 +124,12 @@ def cwt(data, scales, wavelet, sampling_period=1., method='conv', axis=-1):
raise AxisError("axis must be a scalar.")

dt_out = dt_cplx if wavelet.complex_cwt else dt
out = np.empty((np.size(scales),) + data.shape, dtype=dt_out)

# out length of transform when applying down sampling
downsampled_length = len(data) // hop_size
data_sampled = np.empty((1, downsampled_length))
out = np.empty((np.size(scales), downsampled_length), dtype=dt_out)

precision = 10
int_psi, x = integrate_wavelet(wavelet, precision=precision)
int_psi = np.conj(int_psi) if wavelet.complex_cwt else int_psi
Expand Down Expand Up @@ -167,25 +182,30 @@ def cwt(data, scales, wavelet, sampling_period=1., method='conv', axis=-1):
)
if size_scale != size_scale0:
# Must recompute fft_data when the padding size changes.
fft_data = np.fft.fft(data, size_scale, axis=-1)
fft_data = fftmodule.fft(data, size_scale, axis=-1)
size_scale0 = size_scale
fft_wav = np.fft.fft(int_psi_scale, size_scale, axis=-1)
conv = np.fft.ifft(fft_wav * fft_data, axis=-1)
fft_wav = fftmodule.fft(int_psi_scale, size_scale, axis=-1)
conv = fftmodule.ifft(fft_wav * fft_data, axis=-1)
conv = conv[..., :data.shape[-1] + int_psi_scale.size - 1]

coef = - np.sqrt(scale) * np.diff(conv, axis=-1)
coef_temp = - np.sqrt(scale) * np.diff(conv, axis=-1)

# Apply time downsampling
coef = coef_temp[::hop_size] # Selecting every `hop_size`-th sample

if out.dtype.kind != 'c':
coef = coef.real

# transform axis is always -1 due to the data reshape above
d = (coef.shape[-1] - data.shape[-1]) / 2.
d = (coef.shape[-1] - data_sampled.shape[-1]) / 2.
if d > 0:
coef = coef[..., floor(d):-ceil(d)]
elif d < 0:
raise ValueError(
f"Selected scale of {scale} too small.")
if data.ndim > 1:
# restore original data shape and axis position
coef = coef.reshape(data_shape_pre)
coef = coef.reshape(data_sampled)
coef = coef.swapaxes(axis, -1)
out[i, ...] = coef

Expand Down