Skip to content

Fixed #772 Add Ray Support #986

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 32 commits into from
Jul 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
24190fa
Initial commit for adding ray support
seanlaw Jun 9, 2024
b037dde
Fixed missing import
seanlaw Jun 10, 2024
6e4bf0a
Fixed typo
seanlaw Jun 10, 2024
5aa784f
Refactored coverage reporting
seanlaw Jun 10, 2024
46dbe91
Force codecov to use coverage.py report instead of generating its own
seanlaw Jun 10, 2024
c96e584
Specify coverage.xml in PWD
seanlaw Jun 10, 2024
d902e45
Added verbose flag
seanlaw Jun 10, 2024
2bc2d4c
Minor change
seanlaw Jun 10, 2024
a900b26
Added new codecov token
seanlaw Jun 10, 2024
47759e3
Added temporary break in test converage
seanlaw Jun 10, 2024
f177af8
Minor change
seanlaw Jun 10, 2024
dc76940
Changed name of coverage file
seanlaw Jun 10, 2024
c536ae9
Expand coverage to all tests, removed break
seanlaw Jun 10, 2024
ec19b48
Removed comments in workflow
seanlaw Jun 10, 2024
33ca68f
Removed codecov patch/project status
seanlaw Jun 11, 2024
1b620be
Minor change
seanlaw Jun 11, 2024
8df0f37
Added ability to specify coverage.xml file
seanlaw Jun 11, 2024
795bbef
Split xml report and displaying report
seanlaw Jun 11, 2024
5a819af
Updated docstrings to include `ray` example
seanlaw Jun 11, 2024
00e511a
Fixed flake8 problem
seanlaw Jun 11, 2024
f2f4e47
Added ray docstring examples
seanlaw Jun 11, 2024
a86ccb2
Fixed typo
seanlaw Jun 11, 2024
1c5bb71
Reverted missing docstring
seanlaw Jun 11, 2024
9154613
Minor change
seanlaw Jun 12, 2024
c0ca23d
Reverted docstring
seanlaw Jun 12, 2024
3d93948
Merge branch 'TDAmeritrade:main' into add_ray_support
seanlaw Jun 25, 2024
22e462a
Merge branch 'main' into add_ray_support
seanlaw Jun 27, 2024
e4940f6
Minor changes
seanlaw Jun 30, 2024
e33bd78
Merge branch 'main' into add_ray_support
seanlaw Jun 30, 2024
3e5b1b9
Added ray.shutdown()
seanlaw Jul 1, 2024
6a1c678
Changed how `step` is calculated to be more precise
seanlaw Jul 2, 2024
8af81d0
Fixed missing paratheses
seanlaw Jul 3, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .github/workflows/github-actions.yml
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,12 @@ jobs:
- name: Run Coverage Tests
run: ./test.sh coverage
shell: bash
- name: Check Coverage Report
run: coverage report -m --fail-under=100 --skip-covered --omit=docstring.py,min.py,stumpy/cache.py
- name: Generate Coverage Report
run: ./test.sh report coverage.stumpy.xml
shell: bash
- name: Upload Coverage Tests Results
uses: codecov/codecov-action@v4
with:
file: ./coverage.stumpy.xml
verbose: true
token: ${{ secrets.CODECOV_TOKEN }}
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ LOG*
PID
.coverage*
coverage.xml
stumpy.coverage.xml
dask-worker-space
stumpy.egg-info
build
Expand All @@ -20,4 +21,4 @@ docs/_build
.mypy_cache
.directory
test.py
*.nbconvert.ipynb
*.nbconvert.ipynb
4 changes: 4 additions & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
coverage:
status:
project: off
patch: off
15 changes: 15 additions & 0 deletions conda.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ if [[ $# -gt 0 ]]; then
if [ $1 == "min" ]; then
install_mode="min"
echo "Installing minimum dependencies with install_mode=\"min\""
elif [[ $1 == "ray" ]]; then
install_mode="ray"
echo "Installing ray dependencies with install_mode=\"ray\""
elif [[ $1 == "numba" ]] && [[ "${arch_name}" != "arm64" ]]; then
install_mode="numba"
echo "Installing numba release candidate dependencies with install_mode=\"numba\""
Expand Down Expand Up @@ -57,6 +60,14 @@ generate_numba_environment_yaml()
grep -Ev "numba|python" environment.yml > environment.numba.yml
}

generate_ray_environment_yaml()
{
# Limit max Python version and append pip install ray
echo "Generating \"environment.ray.yml\" File"
ray_python=`./ray_python_version.py`
sed "/ - python/ s/$/,<=$ray_python/" environment.yml | cat - <(echo $' - pip\n - pip:\n - ray>=2.23.0') > environment.ray.yml
}

fix_libopenblas()
{
if [ ! -f $CONDA_PREFIX/lib/libopenblas.dylib ]; then
Expand All @@ -71,6 +82,7 @@ clean_up()
echo "Cleaning Up"
rm -rf "environment.min.yml"
rm -rf "environment.numba.yml"
rm -rf "environment.ray.yml"
}

###########
Expand All @@ -92,6 +104,9 @@ fi
if [[ $install_mode == "min" ]]; then
generate_min_environment_yaml
mamba env update --name $conda_env --file environment.min.yml || conda env update --name $conda_env --file environment.min.yml
elif [[ $install_mode == "ray" ]]; then
generate_ray_environment_yaml
mamba env update --name $conda_env --file environment.ray.yml || conda env update --name $conda_env --file environment.ray.yml
elif [[ $install_mode == "numba" ]]; then
echo ""
echo "Installing python=$python_version"
Expand Down
File renamed without changes.
17 changes: 17 additions & 0 deletions ray_python_version.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#!/usr/bin/env python

import requests
from packaging.version import Version

classifiers = (
requests.get("https://pypi.org/pypi/ray/json").json().get("info").get("classifiers")
)

versions = []
for c in classifiers:
x = c.split()
if "Python" in x:
versions.append(x[-1])

versions.sort(key=Version)
print(versions[-1])
12 changes: 5 additions & 7 deletions stumpy/aamp_stimp.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,16 +486,15 @@ def __init__(

class aamp_stimped(_aamp_stimp):
"""
Compute the Pan Matrix Profile with a distributed dask cluster
Compute the Pan Matrix Profile with a `dask`/`ray` cluster

This is based on the SKIMP algorithm.

Parameters
----------
client : client
A Dask or Ray Distributed client. Setting up a distributed cluster is beyond
the scope of this library. Please refer to the Dask or Ray Distributed
documentation.
A `dask`/`ray` client. Setting up a cluster is beyond the scope of this library.
Please refer to the `dask`/`ray` documentation.

T : numpy.ndarray
The time series or sequence for which to compute the pan matrix profile
Expand Down Expand Up @@ -556,9 +555,8 @@ def __init__(
Parameters
----------
client : client
A Dask or Ray Distributed client. Setting up a distributed cluster is beyond
the scope of this library. Please refer to the Dask or Ray Distributed
documentation.
A `dask`/`ray` client. Setting up a cluster is beyond the scope of this
library. Please refer to the `dask`/`ray` documentation.

T : numpy.ndarray
The time series or sequence for which to compute the pan matrix profile
Expand Down
163 changes: 154 additions & 9 deletions stumpy/aamped.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def _dask_aamped(
):
"""
Compute the non-normalized (i.e., without z-normalization) matrix profile with a
distributed dask cluster
`dask` cluster

This is a highly distributed implementation around the Numba JIT-compiled
parallelized `_aamp` function which computes the non-normalized matrix profile
Expand All @@ -33,17 +33,15 @@ def _dask_aamped(
Parameters
----------
dask_client : client
A Dask Distributed client. Setting up a distributed cluster is beyond
the scope of this library. Please refer to the Dask Distributed
documentation.
A `dask` client. Setting up a cluster is beyond the scope of this library.
Please refer to the `dask` documentation.

T_A : numpy.ndarray
The time series or sequence for which to compute the matrix profile

T_B : numpy.ndarray
The time series or sequence that will be used to annotate T_A. For every
subsequence in T_A, its nearest neighbor in T_B will be recorded. Default is
`None` which corresponds to a self-join.
subsequence in T_A, its nearest neighbor in T_B will be recorded.

m : int
Window size
Expand Down Expand Up @@ -159,9 +157,157 @@ def _dask_aamped(
return out


def _ray_aamped(
ray_client,
T_A,
T_B,
m,
T_A_subseq_isfinite,
T_B_subseq_isfinite,
p,
diags,
ignore_trivial,
k,
):
"""
Compute the non-normalized (i.e., without z-normalization) matrix profile with a
`ray` cluster

This is a highly distributed implementation around the Numba JIT-compiled
parallelized `_aamp` function which computes the non-normalized matrix profile
according to AAMP.

Parameters
----------
ray_client : client
A `ray` client. Setting up a cluster is beyond the scope of this library.
Please refer to the `ray` documentation.

T_A : numpy.ndarray
The time series or sequence for which to compute the matrix profile

T_B : numpy.ndarray
The time series or sequence that will be used to annotate T_A. For every
subsequence in T_A, its nearest neighbor in T_B will be recorded.

m : int
Window size

T_A_subseq_isfinite : numpy.ndarray
A boolean array that indicates whether a subsequence in `T_A` contains a
`np.nan`/`np.inf` value (False)

T_B_subseq_isfinite : numpy.ndarray
A boolean array that indicates whether a subsequence in `T_B` contains a
`np.nan`/`np.inf` value (False)

p : float
The p-norm to apply for computing the Minkowski distance. Minkowski distance is
typically used with `p` being 1 or 2, which correspond to the Manhattan distance
and the Euclidean distance, respectively.

diags : numpy.ndarray
The diagonal indices

ignore_trivial : bool, default True
Set to `True` if this is a self-join. Otherwise, for AB-join, set this
to `False`. Default is `True`.

k : int, default 1
The number of top `k` smallest distances used to construct the matrix profile.
Note that this will increase the total computational time and memory usage
when k > 1. If you have access to a GPU device, then you may be able to
leverage `gpu_stump` for better performance and scalability.

Returns
-------
out : numpy.ndarray
When k = 1 (default), the first column consists of the matrix profile,
the second column consists of the matrix profile indices, the third column
consists of the left matrix profile indices, and the fourth column consists
of the right matrix profile indices. However, when k > 1, the output array
will contain exactly 2 * k + 2 columns. The first k columns (i.e., out[:, :k])
consists of the top-k matrix profile, the next set of k columns
(i.e., out[:, k:2k]) consists of the corresponding top-k matrix profile
indices, and the last two columns (i.e., out[:, 2k] and out[:, 2k+1] or,
equivalently, out[:, -2] and out[:, -1]) correspond to the top-1 left
matrix profile indices and the top-1 right matrix profile indices, respectively.
"""
core.check_ray(ray_client)

n_A = T_A.shape[0]
n_B = T_B.shape[0]
l = n_A - m + 1

nworkers = core.get_ray_nworkers(ray_client)

ndist_counts = core._count_diagonal_ndist(diags, m, n_A, n_B)
diags_ranges = core._get_array_ranges(ndist_counts, nworkers, False)
diags_ranges += diags[0]

# Scatter data to Ray cluster
T_A_ref = ray_client.put(T_A)
T_B_ref = ray_client.put(T_B)
T_A_subseq_isfinite_ref = ray_client.put(T_A_subseq_isfinite)
T_B_subseq_isfinite_ref = ray_client.put(T_B_subseq_isfinite)

diags_refs = []
for i in range(nworkers):
diags_ref = ray_client.put(
np.arange(diags_ranges[i, 0], diags_ranges[i, 1], dtype=np.int64)
)
diags_refs.append(diags_ref)

ray_aamp_func = ray_client.remote(core.deco_ray_tor(_aamp))

refs = []
for i in range(nworkers):
refs.append(
ray_aamp_func.remote(
T_A_ref,
T_B_ref,
m,
T_A_subseq_isfinite_ref,
T_B_subseq_isfinite_ref,
p,
diags_refs[i],
ignore_trivial,
k,
)
)

results = ray_client.get(refs)
# Must make a mutable copy from Ray's object store (ndarrays are immutable)
profile, profile_L, profile_R, indices, indices_L, indices_R = [
arr.copy() for arr in results[0]
]

for i in range(1, nworkers):
P, PL, PR, I, IL, IR = results[i] # Read-only variables
# Update top-k matrix profile and matrix profile indices
core._merge_topk_PI(profile, P, indices, I)

# Update top-1 left matrix profile and matrix profile index
mask = PL < profile_L
profile_L[mask] = PL[mask]
indices_L[mask] = IL[mask]

# Update top-1 right matrix profile and matrix profile index
mask = PR < profile_R
profile_R[mask] = PR[mask]
indices_R[mask] = IR[mask]

out = np.empty((l, 2 * k + 2), dtype=object)
out[:, :k] = profile
out[:, k : 2 * k + 2] = np.column_stack((indices, indices_L, indices_R))

return out


def aamped(client, T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
"""
Compute the non-normalized (i.e., without z-normalization) matrix profile
with a `dask`/`ray` cluster

This is a highly distributed implementation around the Numba JIT-compiled
parallelized `_aamp` function which computes the non-normalized matrix profile
Expand All @@ -170,9 +316,8 @@ def aamped(client, T_A, m, T_B=None, ignore_trivial=True, p=2.0, k=1):
Parameters
----------
client : client
A Dask or Ray Distributed client. Setting up a distributed cluster is beyond
the scope of this library. Please refer to the Dask or Ray Distributed
documentation.
A `dask`/`ray` client. Setting up a cluster is beyond the scope of this library.
Please refer to the `dask`/`ray` documentation.

T_A : numpy.ndarray
The time series or sequence for which to compute the matrix profile
Expand Down
Loading
Loading