diff --git a/aeon/classification/distance_based/_time_series_neighbors.py b/aeon/classification/distance_based/_time_series_neighbors.py index f89b1be636..b706d7392e 100644 --- a/aeon/classification/distance_based/_time_series_neighbors.py +++ b/aeon/classification/distance_based/_time_series_neighbors.py @@ -13,9 +13,11 @@ from typing import Callable, Union import numpy as np +from joblib import Parallel, delayed from aeon.classification.base import BaseClassifier from aeon.distances import get_distance_function +from aeon.utils.validation import check_n_jobs WEIGHTS_SUPPORTED = ["uniform", "distance"] @@ -46,11 +48,15 @@ class KNeighborsTimeSeriesClassifier(BaseClassifier): n_timepoints)`` as input and returns a float. distance_params : dict, default = None Dictionary for metric parameters for the case that distance is a str. - n_jobs : int, default = None + n_jobs : int, default = 1 The number of parallel jobs to run for neighbors search. ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context. ``-1`` means using all processors. - for more details. Parameter for compatibility purposes, still unimplemented. + parallel_backend : str, ParallelBackendBase instance or None, default=None + Specify the parallelisation backend implementation in joblib, if None + a ‘prefer’ value of “threads” is used by default. Valid options are + “loky”, “multiprocessing”, “threading” or a custom backend. + See the joblib Parallel documentation for more details. Examples -------- @@ -79,11 +85,13 @@ def __init__( n_neighbors: int = 1, weights: Union[str, Callable] = "uniform", n_jobs: int = 1, + parallel_backend: str = None, ) -> None: self.distance = distance self.distance_params = distance_params self.n_neighbors = n_neighbors self.n_jobs = n_jobs + self.parallel_backend = parallel_backend self._distance_params = distance_params if self._distance_params is None: @@ -134,16 +142,11 @@ def _predict_proba(self, X): The class probabilities of the input samples. Classes are ordered by lexicographic order. """ - preds = np.zeros((len(X), len(self.classes_))) - for i in range(len(X)): - idx, weights = self._kneighbors(X[i]) - for id, w in zip(idx, weights): - predicted_class = self.y_[id] - preds[i, predicted_class] += w - - preds[i] = preds[i] / np.sum(preds[i]) - - return preds + n_jobs = check_n_jobs(self.n_jobs) + preds = Parallel(n_jobs=n_jobs, backend=self.parallel_backend)( + delayed(self._proba_row)(x) for x in X + ) + return np.array(preds) def _predict(self, X): """ @@ -161,19 +164,27 @@ def _predict(self, X): y : array of shape (n_cases) Class labels for each data sample. """ - self._check_is_fitted() - - preds = np.empty(len(X), dtype=self.classes_.dtype) - for i in range(len(X)): - scores = np.zeros(len(self.classes_)) - idx, weights = self._kneighbors(X[i]) - for id, w in zip(idx, weights): - predicted_class = self.y_[id] - scores[predicted_class] += w - - preds[i] = self.classes_[np.argmax(scores)] - - return preds + n_jobs = check_n_jobs(self.n_jobs) + preds = Parallel(n_jobs=n_jobs, backend=self.parallel_backend)( + delayed(self._predict_row)(x) for x in X + ) + return np.array(preds, dtype=self.classes_.dtype) + + def _proba_row(self, x): + scores = self._predict_scores(x) + return scores / np.sum(scores) + + def _predict_row(self, x): + scores = self._predict_scores(x) + return self.classes_[np.argmax(scores)] + + def _predict_scores(self, x): + scores = np.zeros(len(self.classes_)) + idx, weights = self._kneighbors(x) + for id, weight in zip(idx, weights): + predicted_class = self.y_[id] + scores[predicted_class] += weight + return scores def _kneighbors(self, X): """