diff --git a/aeon/transformations/collection/imbalance/__init__.py b/aeon/transformations/collection/imbalance/__init__.py new file mode 100644 index 0000000000..d6ee723069 --- /dev/null +++ b/aeon/transformations/collection/imbalance/__init__.py @@ -0,0 +1,7 @@ +"""Supervised transformers to rebalance colelctions of time series.""" + +__all__ = ["ADASYN", "SMOTE", "OHIT"] + +from aeon.transformations.collection.imbalance._adasyn import ADASYN +from aeon.transformations.collection.imbalance._ohit import OHIT +from aeon.transformations.collection.imbalance._smote import SMOTE diff --git a/aeon/transformations/collection/imbalance/_adasyn.py b/aeon/transformations/collection/imbalance/_adasyn.py new file mode 100644 index 0000000000..fef69ad467 --- /dev/null +++ b/aeon/transformations/collection/imbalance/_adasyn.py @@ -0,0 +1,118 @@ +"""ADASYN over sampling algorithm. + +See more in imblearn.over_sampling.ADASYN +original authors: +# Guillaume Lemaitre +# Fernando Nogueira +# Christos Aridas +# Dzianis Dudnik +# License: MIT +""" + +import numpy as np +from sklearn.utils import check_random_state + +from aeon.transformations.collection.imbalance._smote import SMOTE + +__maintainer__ = ["TonyBagnall"] +__all__ = ["ADASYN"] + + +class ADASYN(SMOTE): + """ + Adaptive Synthetic Sampling (ADASYN) over-sampler. + + Generates synthetic samples for the minority class based on local data + distribution. ADASYN extends SMOTE by adapting the number of synthetic samples + according to the density of the minority class: more samples are generated for + minority samples that are harder to learn (i.e., surrounded by more majority + samples). + + This implementation is adapted from imbalanced-learn's + `imblearn.over_sampling.ADASYN`. + + Parameters + ---------- + random_state : int or None, optional (default=None) + Random seed for reproducibility. + k_neighbors : int, optional (default=5) + Number of nearest neighbours used to construct synthetic samples. + + References + ---------- + .. [1] He, H., Bai, Y., Garcia, E. A., & Li, S. (2008). + ADASYN: Adaptive synthetic sampling approach for imbalanced learning. + In IEEE International Joint Conference on Neural Networks, pp. 1322-1328. + https://doi.org/10.1109/IJCNN.2008.4633969 + + Examples + -------- + >>> from aeon.transformations.collection.imbalance import ADASYN + >>> import numpy as np + >>> X = np.random.random(size=(100,1,50)) + >>> y = np.array([0] * 90 + [1] * 10) + >>> sampler = ADASYN(random_state=49) + >>> X_res, y_res = sampler.fit_transform(X, y) + """ + + def __init__(self, random_state=None, k_neighbors=5): + super().__init__(random_state=random_state, k_neighbors=k_neighbors) + + def _transform(self, X, y=None): + X = np.squeeze(X, axis=1) + random_state = check_random_state(self.random_state) + X_resampled = [X.copy()] + y_resampled = [y.copy()] + + # got the minority class label and the number needs to be generated + for class_sample, n_samples in self.sampling_strategy_.items(): + if n_samples == 0: + continue + target_class_indices = np.flatnonzero(y == class_sample) + X_class = X[target_class_indices] + + self.nn_.fit(X) + nns = self.nn_.kneighbors(X_class, return_distance=False)[:, 1:] + # The ratio is computed using a one-vs-rest manner. Using majority + # in multi-class would lead to slightly different results at the + # cost of introducing a new parameter. + n_neighbors = self.nn_.n_neighbors - 1 + ratio_nn = np.sum(y[nns] != class_sample, axis=1) / n_neighbors + if not np.sum(ratio_nn): + raise RuntimeError( + "Not any neigbours belong to the majority" + " class. This case will induce a NaN case" + " with a division by zero. ADASYN is not" + " suited for this specific dataset." + " Use SMOTE instead." + ) + ratio_nn /= np.sum(ratio_nn) + n_samples_generate = np.rint(ratio_nn * n_samples).astype(int) + # rounding may cause new amount for n_samples + n_samples = np.sum(n_samples_generate) + if not n_samples: + raise ValueError( + "No samples will be generated with the provided ratio settings." + ) + + # the nearest neighbors need to be fitted only on the current class + # to find the class NN to generate new samples + self.nn_.fit(X_class) + nns = self.nn_.kneighbors(X_class, return_distance=False)[:, 1:] + + enumerated_class_indices = np.arange(len(target_class_indices)) + rows = np.repeat(enumerated_class_indices, n_samples_generate) + cols = random_state.choice(n_neighbors, size=n_samples) + diffs = X_class[nns[rows, cols]] - X_class[rows] + steps = random_state.uniform(size=(n_samples, 1)) + X_new = X_class[rows] + steps * diffs + + X_new = X_new.astype(X.dtype) + y_new = np.full(n_samples, fill_value=class_sample, dtype=y.dtype) + X_resampled.append(X_new) + y_resampled.append(y_new) + X_resampled = np.vstack(X_resampled) + y_resampled = np.hstack(y_resampled) + + X_resampled = X_resampled[:, np.newaxis, :] + return X_resampled, y_resampled diff --git a/aeon/transformations/collection/imbalance/_ohit.py b/aeon/transformations/collection/imbalance/_ohit.py new file mode 100644 index 0000000000..4d154a2d13 --- /dev/null +++ b/aeon/transformations/collection/imbalance/_ohit.py @@ -0,0 +1,288 @@ +"""OHIT over sampling algorithm. + +An adaptation of the oversampling method based on DRSNN clustering. + +Original authors: +# zhutuanfei +""" + +from collections import OrderedDict + +import numpy as np +from scipy.stats import multivariate_normal +from sklearn.covariance import ledoit_wolf +from sklearn.neighbors import NearestNeighbors +from sklearn.utils import check_random_state + +from aeon.transformations.collection import BaseCollectionTransformer + +__all__ = ["OHIT"] + + +class OHIT(BaseCollectionTransformer): + """ + Over-sampling based on High-density region and Iterative Thresholding (OHIT). + + OHIT generates synthetic minority class samples based on the Density-Ratio Shared + Nearest Neighbor (DRSNN) clustering algorithm. It identifies high-density regions + amoung the minority class using DRSNN, then produces synthetic samples within + these clusters. Covariance estimation for high-dimensional data is performed using + shrinkage techniques. + + The DRSNN procedure involves three main parameters: + - `drT`: the density ratio threshold (typically set around 1). + - `k`: the nearest neighbour parameter in shared nearest neighbour similarity. + - `kapa`: the nearest neighbour parameter in defining density ratio. + + `k` and `kapa` should be set in a complementary manner to avoid cluster merging + and dissociation. Typically, a large `k` is paired with a relatively low `kapa`. + + Parameters + ---------- + k : int or None, optional + The nearest neighbour parameter for SNN similarity. + If None, set to int(np.ceil(n ** 0.5 * 1.25)), where n is the number of + minority samples. + kapa : int or None, optional + The nearest neighbour parameter for defining the density ratio. + If None, set to int(np.ceil(n ** 0.5)), where n is the number of minority + samples. + drT : float, default=0.9 + Threshold for the density ratio in DRSNN clustering. + distance : str or callable, default='euclidean' + Distance metric to use for KNN in SNN similarity computation. + random_state : int, RandomState instance or None, default=None + Controls random number generation for reproducibility: + - If `int`, sets the random seed. + - If `RandomState` instance, uses it as the generator. + - If `None`, uses `np.random`. + + References + ---------- + .. [1] T. Zhu, C. Luo, Z. Zhang, J. Li, S. Ren, and Y. Zeng. Minority + oversampling for imbalanced time series classification. Knowledge-Based Systems, + 247:108764, 2022. + + Examples + -------- + >>> from aeon.transformations.collection.imbalance import OHIT + >>> from aeon.testing.data_generation import make_example_3d_numpy + >>> import numpy as np + >>> X = make_example_3d_numpy(n_cases=100, return_y=False, random_state=49) + >>> y = np.array([0] * 90 + [1] * 10) + >>> sampler = OHIT(random_state=49) + >>> X_res, y_res = sampler.fit_transform(X, y) + >>> y_res.shape + (180,) + """ + + _tags = { + "requires_y": True, + } + + def __init__( + self, k=None, kapa=None, drT=0.9, distance="euclidean", random_state=None + ): + self.k = k + self.kapa = kapa + self.drT = drT + self.distance = distance + self.random_state = random_state + super().__init__() + + def _fit(self, X, y=None): + + unique, counts = np.unique(y, return_counts=True) + target_stats = dict(zip(unique, counts)) + n_sample_majority = max(target_stats.values()) + class_majority = max(target_stats, key=target_stats.get) + sampling_strategy = { + key: n_sample_majority - value + for (key, value) in target_stats.items() + if key != class_majority + } + self.sampling_strategy_ = OrderedDict(sorted(sampling_strategy.items())) + + return self + + def _transform(self, X, y=None): + X = np.squeeze(X, axis=1) + X_resampled = [X.copy()] + y_resampled = [y.copy()] + + for class_sample, n_samples in self.sampling_strategy_.items(): + if n_samples == 0: + continue + target_class_indices = np.flatnonzero(y == class_sample) + if len(target_class_indices) == 1: + X_new = np.tile(X[target_class_indices], (n_samples, 1)) + y_new = np.full(n_samples, fill_value=class_sample, dtype=y.dtype) + X_resampled.append(X_new) + y_resampled.append(y_new) + continue + X_class = X[target_class_indices] + n, m = X_class.shape + # set the default value of k and kapa + if self.k is None: + self.k = int(np.ceil(n**0.5 * 1.25)) + if self.kapa is None: + self.kapa = int(np.ceil(n**0.5)) + + # Initialize NearestNeighbors for SNN similarity + self.nn_ = NearestNeighbors(metric=self.distance, n_neighbors=self.k + 1) + + clusters, cluster_label = self._cluster_minority(X_class) + Me, eigen_matrices, eigen_values = self._covStruct(X_class, clusters) + + # allocate the number of synthetic samples to be generated for each cluster + random_state = check_random_state(self.random_state) + os_ind = np.tile(np.arange(0, n), int(np.floor(n_samples / n))) + remaining = random_state.choice( + np.arange(0, n), + n_samples - n * int(np.floor(n_samples / n)), + replace=False, + ) + os_ind = np.concatenate([os_ind, remaining]) + R = 1.25 if len(clusters) > 1 else 1.1 + + # generate the structure-preserving synthetic samples for each cluster + X_new = np.zeros((n_samples, m)) + count = 0 + X_class_0 = X_class[cluster_label == 0] + if X_class_0.size != 0: + gen_0 = np.sum(np.isin(os_ind, np.where(cluster_label == 0)[0])) + idx_0 = random_state.choice(len(X_class_0), gen_0, replace=True) + X_new[count : count + gen_0, :] = X_class_0[idx_0] + count += gen_0 + for i, _ in enumerate(clusters): + gen_i = np.sum(np.isin(os_ind, np.where(cluster_label == (i + 1))[0])) + X_new[count : count + gen_i, :] = self._generate_synthetic_samples( + Me[i], eigen_matrices[i], eigen_values[i], gen_i, R + ) + count += gen_i + + assert count == n_samples + X_resampled.append(X_new) + y_new = np.full(n_samples, fill_value=class_sample, dtype=y.dtype) + y_resampled.append(y_new) + + X_resampled = np.vstack(X_resampled) + y_resampled = np.hstack(y_resampled) + X_resampled = X_resampled[:, np.newaxis, :] + return X_resampled, y_resampled + + def _cluster_minority(self, X): + """Apply DRSNN clustering on minority class samples.""" + n = X.shape[0] + k = self.k + kapa = self.kapa + drT = self.drT + + self.nn_.fit(X) + neighbors = self.nn_.kneighbors(X, return_distance=False)[:, 1:] + # construct the shared nearest neighbor similarity + strength = np.zeros((n, n)) + for i in range(n): + for j in range(i + 1, n): + shared_nn = np.intersect1d(neighbors[i, :k], neighbors[j, :k]) + strength[i, j] = strength[j, i] = np.sum( + (k + 1 - np.searchsorted(neighbors[i, :k], shared_nn)) + * (k + 1 - np.searchsorted(neighbors[j, :k], shared_nn)) + ) + + # construct the shared nearest neighbor graph + strength_nn = np.sort(strength, axis=1)[:, ::-1][:, :k] + idx_nn = np.argsort(strength, axis=1)[:, ::-1] + graph = np.zeros((n, k)) + for i in range(n): + for j in range(k): + if np.any(idx_nn[idx_nn[i, j], :k] == i): + graph[i, j] = 1 + + density = np.sum(strength_nn * graph, axis=1) + density_ratio = np.zeros(n) + for i in range(n): + non_noise = np.where(density[idx_nn[i, :kapa]] != 0)[0] + if non_noise.size == 0: + density_ratio[i] = 0 + else: + density_ratio[i] = density[i] / np.mean(density[idx_nn[i, non_noise]]) + + # identify core points + core_idx = np.where(density_ratio > drT)[0] + # find directly density-reachable samples for each core point + neighborhood = {core: set(idx_nn[core, :kapa]) for core in core_idx} + for i in core_idx: + for j in core_idx: + if np.any(idx_nn[j, :kapa] == i): + neighborhood[i].add(j) + neighborhood = {key: list(value) for key, value in neighborhood.items()} + + clusters = [] + cluster_label = np.zeros(len(neighbors), dtype=int) + cluster_id = 0 + + for i in core_idx: + if cluster_label[i] == 0: + cluster_id += 1 + seed = [i] + clusters.append(set(seed)) + while seed: + point = seed.pop(0) + idx = np.where(core_idx == point)[0] + if idx.size > 0 and cluster_label[point] == 0: + seed.extend(neighborhood[point]) + clusters[-1].update(neighborhood[point]) + cluster_label[point] = cluster_id + # no cluster has been found, the whole samples are taken as one cluster + if len(clusters) == 0: + clusters.append(list(range(n))) + cluster_label = np.ones(n, dtype=int) + return clusters, cluster_label + + def _covStruct(self, data, clusters): + """Calculate the covariance matrix of the minority samples.""" + Me, Eigen_matrices, Eigen_values = [], [], [] + for cluster in clusters: + cluster = list(cluster) + cluster_data = data[cluster] + sigma, shrinkage = ledoit_wolf(cluster_data) + me = np.mean(cluster_data, axis=0) + eigenValues, eigenVectors = np.linalg.eigh(sigma) + eigenValues = np.diag(eigenValues) + Me.append(me) + Eigen_matrices.append(eigenVectors) + Eigen_values.append(eigenValues) + return Me, Eigen_matrices, Eigen_values + + def _generate_synthetic_samples(self, Me, eigenMatrix, eigenValue, eta, R): + """Generate synthetic samples based on clustered minority samples.""" + # Initialize the output sample generator and probability arrays + n_samples = int(np.ceil(eta * R)) + SampGen = np.zeros((n_samples, len(Me))) + Prob = np.zeros(n_samples) + + # Calculate the square root of the absolute eigenvalues + DD = np.sqrt(np.abs(np.diag(eigenValue))) + DD = DD.reshape(1, -1) + + # Initialize mean and covariance for the multivariate normal distribution + Mu = np.zeros(len(Me)) + Sigma = np.eye(len(Me)) + + for cnt in range(n_samples): + # Generate a sample from the multivariate normal distribution + S = np.random.multivariate_normal(Mu, Sigma, 1) + Prob[cnt] = multivariate_normal.pdf(S, Mu, Sigma) + + # Scale the sample with the eigenvalues + S = S * DD + # Generate the final sample by applying the eigenvector matrix + x = S @ eigenMatrix.T + Me + SampGen[cnt, :] = x + + # Sort the samples based on the probability in descending order + sorted_indices = np.argsort(Prob)[::-1] + SampGen = SampGen[sorted_indices[:eta], :] + + return SampGen diff --git a/aeon/transformations/collection/imbalance/_smote.py b/aeon/transformations/collection/imbalance/_smote.py new file mode 100644 index 0000000000..f6e7062f2f --- /dev/null +++ b/aeon/transformations/collection/imbalance/_smote.py @@ -0,0 +1,252 @@ +"""SMOTE over sampling algorithm. + +See more in imblearn.over_sampling.SMOTE +original authors: +# Guillaume Lemaitre +# Fernando Nogueira +# Christos Aridas +# Dzianis Dudnik +# License: MIT +""" + +from collections import OrderedDict + +import numpy as np +from sklearn.neighbors import NearestNeighbors +from sklearn.utils import check_random_state + +from aeon.transformations.collection import BaseCollectionTransformer + +__maintainer__ = ["TonyBagnall"] +__all__ = ["SMOTE"] + + +class SMOTE(BaseCollectionTransformer): + """ + Synthetic Minority Over-sampling TEchnique (SMOTE) for imbalanced datasets. + + Generates synthetic samples of the minority class to address class imbalance. + SMOTE constructs new samples by interpolating between existing minority samples + and their nearest neighbours in feature space. + + This implementation adapts the algorithm from `imblearn.over_sampling.SMOTE`. + It targets all classes except the majority, as controlled by the `sampling_strategy` + in the `_fit` method. It uses ``aeon`` distances to find the nearest neighbours. + + Parameters + ---------- + k_neighbors : int, default=5 + Number of nearest neighbours used to generate synthetic samples. A + `sklearn.neighbors.NearestNeighbors` instance is fitted for this purpose. + random_state : int, RandomState instance or None, default=None + Controls the random number generation for reproducibility: + - If `int`, sets the random seed. + - If `RandomState` instance, uses it as the generator. + - If `None`, uses `np.random`. + + See Also + -------- + ADASYN : Adaptive synthetic sampling extension to SMOTE. + + References + ---------- + .. [1] Chawla, N. V., Bowyer, K. W., Hall, L. O., & Kegelmeyer, W. P. (2002). + SMOTE: Synthetic minority over-sampling technique. + Journal of Artificial Intelligence Research, 16, 321–357. + https://dl.acm.org/doi/10.5555/1622407.1622416 + + Examples + -------- + >>> from aeon.transformations.collection.imbalance import SMOTE + >>> from aeon.testing.data_generation import make_example_3d_numpy + >>> import numpy as np + >>> X = make_example_3d_numpy(n_cases=100, return_y=False, random_state=49) + >>> y = np.array([0] * 90 + [1] * 10) + >>> sampler = SMOTE(random_state=49) + >>> X_res, y_res = sampler.fit_transform(X, y) + >>> y_res.shape + (180,) + """ + + _tags = { + "requires_y": True, + } + + def __init__(self, k_neighbors: int = 5, random_state=None): + self.random_state = random_state + self.k_neighbors = k_neighbors + super().__init__() + + def _fit(self, X, y=None): + # set the additional_neighbor required by SMOTE + self.nn_ = NearestNeighbors(n_neighbors=self.k_neighbors + 1) + + # generate sampling target by targeting all classes except the majority + unique, counts = np.unique(y, return_counts=True) + target_stats = dict(zip(unique, counts)) + n_sample_majority = max(target_stats.values()) + class_majority = max(target_stats, key=target_stats.get) + sampling_strategy = { + key: n_sample_majority - value + for (key, value) in target_stats.items() + if key != class_majority + } + self.sampling_strategy_ = OrderedDict(sorted(sampling_strategy.items())) + return self + + def _transform(self, X, y=None): + # remove the channel dimension to be compatible with sklearn + X = np.squeeze(X, axis=1) + X_resampled = [X.copy()] + y_resampled = [y.copy()] + + # got the minority class label and the number needs to be generated + for class_sample, n_samples in self.sampling_strategy_.items(): + if n_samples == 0: + continue + target_class_indices = np.flatnonzero(y == class_sample) + X_class = X[target_class_indices] + + self.nn_.fit(X_class) + nns = self.nn_.kneighbors(X_class, return_distance=False)[:, 1:] + X_new, y_new = self._make_samples( + X_class, y.dtype, class_sample, X_class, nns, n_samples, 1.0 + ) + X_resampled.append(X_new) + y_resampled.append(y_new) + X_resampled = np.vstack(X_resampled) + y_resampled = np.hstack(y_resampled) + X_resampled = X_resampled[:, np.newaxis, :] + return X_resampled, y_resampled + + def _make_samples( + self, X, y_dtype, y_type, nn_data, nn_num, n_samples, step_size=1.0, y=None + ): + """Make artificial samples constructed based on nearest neighbours. + + Parameters + ---------- + X : np.ndarray + Shape (n_cases, n_timepoints), time series from which the new series will + be created. + + y_dtype : dtype + The data type of the targets. + + y_type : str or int + The minority target value, just so the function can return the + target values for the synthetic variables with correct length in + a clear format. + + nn_data : ndarray of shape (n_samples_all, n_features) + Data set carrying all the neighbours to be used + + nn_num : ndarray of shape (n_samples_all, k_nearest_neighbours) + The nearest neighbours of each sample in `nn_data`. + + n_samples : int + The number of samples to generate. + + step_size : float, default=1.0 + The step size to create samples. + + y : ndarray of shape (n_samples_all,), default=None + The true target associated with `nn_data`. Used by Borderline SMOTE-2 to + weight the distances in the sample generation process. + + Returns + ------- + X_new : ndarray + Synthetically generated samples of shape (n_samples_new, n_timepoints). + + y_new : ndarray + Target values for synthetic samples of shape (n_samples_new,). + """ + random_state = check_random_state(self.random_state) + samples_indices = random_state.randint(low=0, high=nn_num.size, size=n_samples) + + # np.newaxis for backwards compatability with random_state + steps = step_size * random_state.uniform(size=n_samples)[:, np.newaxis] + rows = np.floor_divide(samples_indices, nn_num.shape[1]) + cols = np.mod(samples_indices, nn_num.shape[1]) + + X_new = self._generate_samples(X, nn_data, nn_num, rows, cols, steps, y_type, y) + y_new = np.full(n_samples, fill_value=y_type, dtype=y_dtype) + return X_new, y_new + + def _generate_samples( + self, X, nn_data, nn_num, rows, cols, steps, y_type=None, y=None + ): + r"""Generate a synthetic sample. + + The rule for the generation is: + + .. math:: + \mathbf{s_{s}} = \mathbf{s_{i}} + \mathcal{u}(0, 1) \times + (\mathbf{s_{i}} - \mathbf{s_{nn}}) \, + + where \mathbf{s_{s}} is the new synthetic samples, \mathbf{s_{i}} is + the current sample, \mathbf{s_{nn}} is a randomly selected neighbors of + \mathbf{s_{i}} and \mathcal{u}(0, 1) is a random number between [0, 1). + + Parameters + ---------- + X : np.ndarray + Series from which the points will be created of shape (n_cases, + n_timepoints). + nn_data : ndarray of shape (n_samples_all, n_features) + Data set carrying all the neighbours to be used. + nn_num : ndarray of shape (n_samples_all, k_nearest_neighbours) + The nearest neighbours of each sample in `nn_data`. + rows : ndarray of shape (n_samples,), dtype=int + Indices pointing at feature vector in X which will be used + as a base for creating new samples. + cols : ndarray of shape (n_samples,), dtype=int + Indices pointing at which nearest neighbor of base feature vector + will be used when creating new samples. + steps : ndarray of shape (n_samples,), dtype=float + Step sizes for new samples. + y_type : str, int or None, default=None + Class label of the current target classes for which we want to generate + samples. + y : ndarray of shape (n_samples_all,), default=None + The true target associated with `nn_data`. Used by Borderline SMOTE-2 to + weight the distances in the sample generation process. + + Returns + ------- + X_new : {ndarray, sparse matrix} of shape (n_samples, n_features) + Synthetically generated samples. + """ + diffs = nn_data[nn_num[rows, cols]] - X[rows] + if y is not None: + random_state = check_random_state(self.random_state) + mask_pair_samples = y[nn_num[rows, cols]] != y_type + diffs[mask_pair_samples] *= random_state.uniform( + low=0.0, high=0.5, size=(mask_pair_samples.sum(), 1) + ) + X_new = X[rows] + steps * diffs + return X_new.astype(X.dtype) + + @classmethod + def _get_test_params(cls, parameter_set="default"): + """Return testing parameter settings for the estimator. + + Parameters + ---------- + parameter_set : str, default="default" + Name of the set of test parameters to return, for use in tests. If no + special parameters are defined for a value, will return `"default"` set. + ClassifierChannelEnsemble provides the following special sets: + - "results_comparison" - used in some classifiers to compare against + previously generated results where the default set of parameters + cannot produce suitable probability estimates + + Returns + ------- + params : dict or list of dict, default={} + Parameters to create testing instances of the class. + Each dict are parameters to construct an "interesting" test instance, i.e., + `MyClass(**params)` or `MyClass(**params[i])` creates a valid test instance. + """ + return {"k_neighbors": 1} diff --git a/aeon/transformations/collection/imbalance/tests/__init__.py b/aeon/transformations/collection/imbalance/tests/__init__.py new file mode 100644 index 0000000000..55831a6ec8 --- /dev/null +++ b/aeon/transformations/collection/imbalance/tests/__init__.py @@ -0,0 +1 @@ +"""Test resampling transformers.""" diff --git a/aeon/transformations/collection/imbalance/tests/test_adasyn.py b/aeon/transformations/collection/imbalance/tests/test_adasyn.py new file mode 100644 index 0000000000..0bb5c62ea6 --- /dev/null +++ b/aeon/transformations/collection/imbalance/tests/test_adasyn.py @@ -0,0 +1,61 @@ +"""Test ADASYN oversampler ported from imblearn.""" + +import numpy as np +import pytest + +from aeon.testing.data_generation import make_example_3d_numpy +from aeon.transformations.collection.imbalance import ADASYN +from aeon.utils.validation._dependencies import _check_soft_dependencies + + +def test_adasyn(): + """Test the ADASYN class. + + This function creates a 3D numpy array, applies + ADASYN using the ADASYN class, and asserts that the + transformed data has a balanced number of samples. + ADASYN is a variant of SMOTE that generates synthetic samples, + but it focuses on generating samples near the decision boundary. + Therefore, sometimes, it may generate more or less samples than SMOTE, + which is why we only check if the number of samples is nearly balanced. + """ + n_samples = 100 # Total number of labels + majority_num = 90 # number of majority class + minority_num = n_samples - majority_num # number of minority class + + X = np.random.rand(n_samples, 1, 10) + y = np.array([0] * majority_num + [1] * minority_num) + + transformer = ADASYN() + transformer.fit(X, y) + res_X, res_y = transformer.transform(X, y) + _, res_count = np.unique(res_y, return_counts=True) + + assert np.abs(len(res_X) - 2 * majority_num) < minority_num + assert np.abs(len(res_y) - 2 * majority_num) < minority_num + assert res_count[0] == majority_num + assert np.abs(res_count[0] - res_count[1]) < minority_num + + +@pytest.mark.skipif( + not _check_soft_dependencies( + "imbalanced-learn", + package_import_alias={"imbalanced-learn": "imblearn"}, + severity="none", + ), + reason="skip test if required soft dependency imbalanced-learn not available", +) +def test_equivalence_imbalance(): + """Test ported ADASYN code produces the same as imblearn version.""" + from imblearn.over_sampling import ADASYN as imbADASYN + + X, y = make_example_3d_numpy(n_cases=20, n_channels=1) + y = np.array([0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]) + X = X.squeeze() + s1 = imbADASYN(random_state=49) + X2, y2 = s1.fit_resample(X, y) + s2 = ADASYN(random_state=49) + X3, y3 = s2.fit_transform(X, y) + X3 = X3.squeeze() + assert np.array_equal(y2, y3) + assert np.allclose(X2, X3, atol=1e-4) diff --git a/aeon/transformations/collection/imbalance/tests/test_ohit.py b/aeon/transformations/collection/imbalance/tests/test_ohit.py new file mode 100644 index 0000000000..7162aab894 --- /dev/null +++ b/aeon/transformations/collection/imbalance/tests/test_ohit.py @@ -0,0 +1,30 @@ +"""Test function for OHIT.""" + +import numpy as np + +from aeon.transformations.collection.imbalance import OHIT + + +def test_ohit(): + """Test the OHIT class. + + This function creates a 3D numpy array, applies + OHIT using the OHIT class, and asserts that the + transformed data has a balanced number of samples. + """ + n_samples = 100 # Total number of labels + majority_num = 90 # number of majority class + minority_num = n_samples - majority_num # number of minority class + + X = np.random.rand(n_samples, 1, 10) + y = np.array([0] * majority_num + [1] * minority_num) + + transformer = OHIT() + transformer.fit(X, y) + res_X, res_y = transformer.transform(X, y) + _, res_count = np.unique(res_y, return_counts=True) + + assert len(res_X) == 2 * majority_num + assert len(res_y) == 2 * majority_num + assert res_count[0] == majority_num + assert res_count[1] == majority_num diff --git a/aeon/transformations/collection/imbalance/tests/test_smote.py b/aeon/transformations/collection/imbalance/tests/test_smote.py new file mode 100644 index 0000000000..70189633d0 --- /dev/null +++ b/aeon/transformations/collection/imbalance/tests/test_smote.py @@ -0,0 +1,57 @@ +"""Test function for SMOTE.""" + +import numpy as np +import pytest + +from aeon.testing.data_generation import make_example_3d_numpy +from aeon.transformations.collection.imbalance import SMOTE +from aeon.utils.validation._dependencies import _check_soft_dependencies + + +def test_smote(): + """Test the SMOTE class. + + This function creates a 3D numpy array, applies + SMOTE using the SMOTE class, and asserts that the + transformed data has a balanced number of samples. + """ + n_samples = 100 # Total number of labels + majority_num = 90 # number of majority class + minority_num = n_samples - majority_num # number of minority class + + X = np.random.rand(n_samples, 1, 10) + y = np.array([0] * majority_num + [1] * minority_num) + + transformer = SMOTE() + transformer.fit(X, y) + res_X, res_y = transformer.transform(X, y) + _, res_count = np.unique(res_y, return_counts=True) + + assert len(res_X) == 2 * majority_num + assert len(res_y) == 2 * majority_num + assert res_count[0] == majority_num + assert res_count[1] == majority_num + + +@pytest.mark.skipif( + not _check_soft_dependencies( + "imbalanced-learn", + package_import_alias={"imbalanced-learn": "imblearn"}, + severity="none", + ), + reason="skip test if required soft dependency imbalanced-learn not available", +) +def test_equivalence_imbalance(): + """Test ported SMOTE code produces the same as imblearn version.""" + from imblearn.over_sampling import SMOTE as imbSMOTE + + X, y = make_example_3d_numpy(n_cases=20, n_channels=1) + y = np.array([0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]) + X = X.squeeze() + s1 = imbSMOTE(random_state=49) + X2, y2 = s1.fit_resample(X, y) + s2 = SMOTE(random_state=49) + X3, y3 = s2.fit_transform(X, y) + X3 = X3.squeeze() + assert np.array_equal(y2, y3) + assert np.allclose(X2, X3, atol=1e-4) diff --git a/docs/developer_guide/deprecation.md b/docs/developer_guide/deprecation.md index 4b10d81cb2..04aadbab3a 100644 --- a/docs/developer_guide/deprecation.md +++ b/docs/developer_guide/deprecation.md @@ -24,6 +24,7 @@ experimental. Currently experimental modules are: - `segmentation` - `similarity_search` - `visualisation` +- `transformations.collection.imbalance` When we introduce a new module, we may classify it as experimental until the API is stable. We will try to not make drastic changes to experimental modules, but we need