diff --git a/aeon/distances/tests/test_symbolic_mindist.py b/aeon/distances/tests/test_symbolic_mindist.py index 2d81e04ddb..c545b07140 100644 --- a/aeon/distances/tests/test_symbolic_mindist.py +++ b/aeon/distances/tests/test_symbolic_mindist.py @@ -170,3 +170,56 @@ def test_sfa_whole_mindist(): assert mindist_sfa <= ed assert mindist_dft_sfa >= mindist_sfa # a tighter lower bound assert mindist_dft_sfa <= ed + + +def test_dynamic_alphabet_allocation(): + """Test the SFA Min-Distance function.""" + n_segments = 16 + alphabet_size = 64 + + X_train, _ = load_unit_test("TRAIN") + X_test, _ = load_unit_test("TEST") + + X_train = zscore(X_train.squeeze(), axis=1) + X_test = zscore(X_test.squeeze(), axis=1) + histogram_type = "equi-width" + + # print("Testing") + for alphabet_allocation_method in { + "linear_scale", + "log_scale", + "sqrt_scale", + }: + sfa = SFAWhole( + word_length=n_segments, + alphabet_size=alphabet_size, + binning_method=histogram_type, + alphabet_allocation_method=alphabet_allocation_method, + variance=True, # True gives a tighter lower bound + norm=True, + ) + + X_train_words, X_train_dfts = sfa.fit_transform(X_train) + X_test_words, _ = sfa.transform(X_test) + + for i in range(min(X_train.shape[0], X_test.shape[0])): + X = X_train[i].reshape(1, -1) + Y = X_test[i].reshape(1, -1) + + # SFA Min-Distance + mindist_sfa = mindist_sfa_distance( + X_train_words[i], X_test_words[i], sfa.breakpoints + ) + + # DFT-SFA Min-Distance + mindist_dft_sfa = mindist_dft_sfa_distance( + X_train_dfts[i], X_test_words[i], sfa.breakpoints + ) + + # Euclidean Distance + ed = np.linalg.norm(X[0] - Y[0]) + + assert np.mean(np.log2(sfa.alphabet_sizes)) == np.log2(alphabet_size) + assert mindist_sfa <= ed + assert mindist_dft_sfa >= mindist_sfa # a tighter lower bound + assert mindist_dft_sfa <= ed diff --git a/aeon/transformations/collection/dictionary_based/_sfa_fast.py b/aeon/transformations/collection/dictionary_based/_sfa_fast.py index c118ce3878..1f8afe8db4 100644 --- a/aeon/transformations/collection/dictionary_based/_sfa_fast.py +++ b/aeon/transformations/collection/dictionary_based/_sfa_fast.py @@ -41,6 +41,12 @@ "quantile", } +alphabet_allocation_methods = { + "linear_scale", + "log_scale", + "sqrt_scale", +} + simplefilter(action="ignore", category=NumbaPendingDeprecationWarning) simplefilter(action="ignore", category=NumbaTypeSafetyWarning) @@ -65,6 +71,9 @@ class SFAFast(BaseCollectionTransformer): Number of values to discretise each value to. window_size : int, default = 12 Size of window for sliding. Input series length for whole series transform. + alphabet_allocation_method : str, default = None + The method used to learn the dynamic alphabet sizes. One of + {"linear_scale", "log_scale", "sqrt_scale"}. norm : boolean, default = False Mean normalise words by dropping first fourier coefficient. binning_method : str, default="equi-depth" @@ -72,12 +81,12 @@ class SFAFast(BaseCollectionTransformer): "equi-width", "information-gain", "information-gain-mae", "kmeans"}, anova : boolean, default = False If True, the Fourier coefficient selection is done via a one-way ANOVA test. - If False, the first Fourier coefficients are selected. Only applicable if - labels are given. + If False, the first Fourier coefficients are selected. + It is a supervised feature selection strategy. Only applicable if labels given. variance : boolean, default = False If True, the Fourier coefficient selection is done via the largest variance. - If False, the first Fourier coefficients are selected. Only applicable if - labels are given. + If False, the first Fourier coefficients are selected. + It is an unsupervised feature selection strategy. dilation : int, default = 0 When set to dilation > 1, adds dilation to the sliding window operation. save_words : boolean, default = False @@ -143,6 +152,7 @@ def __init__( word_length=8, alphabet_size=4, window_size=12, + alphabet_allocation_method=None, norm=False, binning_method="equi-depth", anova=False, @@ -170,6 +180,7 @@ def __init__( self.word_length = word_length self.alphabet_size = alphabet_size + self.alphabet_sizes = None self.window_size = window_size self.norm = norm @@ -192,11 +203,13 @@ def __init__( self.n_cases = 0 self.n_timepoints = 0 - self.letter_bits = 0 + self.letter_bits = None self.dilation = dilation self.first_difference = first_difference self.sampling_factor = sampling_factor + self.alphabet_allocation_method = alphabet_allocation_method + self.learn_alphabet_sizes = self.alphabet_allocation_method is not None # Feature selection part self.feature_selection = feature_selection @@ -230,6 +243,14 @@ def _fit_transform(self, X, y=None, return_bag_of_words=True): "Please set either variance or anova Fourier coefficient selection" ) + if self.learn_alphabet_sizes and ( + self.alphabet_allocation_method not in alphabet_allocation_methods + ): + raise ValueError( + "alphabet_allocation_method must be one of: ", + alphabet_allocation_methods, + ) + if self.binning_method not in binning_methods: raise TypeError("binning_method must be one of: ", binning_methods) @@ -259,13 +280,16 @@ def _fit_transform(self, X, y=None, return_bag_of_words=True): if (self.anova or self.variance) is True else self.word_length_actual ) + # make dft_length an even number (same number of reals and imags) self.dft_length = self.dft_length + self.dft_length % 2 self.word_length_actual = self.word_length_actual + self.word_length_actual % 2 self.support = np.arange(self.word_length_actual) - self.letter_bits = np.uint32(math.ceil(math.log2(self.alphabet_size))) - # self.word_bits = self.word_length_actual * self.letter_bits + + self.letter_bits = np.zeros(self.word_length_actual, dtype=np.uint32) + self.letter_bits[:] = np.uint32(math.ceil(math.log2(self.alphabet_size))) + X = X.squeeze(1) # subsample the samples @@ -538,10 +562,10 @@ def _binning(self, X, y=None): if self.variance: # determine variance - dft_variance = np.var(dft, axis=0) + self.dft_variance = np.var(dft, axis=0) # select word-length-many indices with the largest variance - self.support = np.argsort(-dft_variance)[: self.word_length_actual] + self.support = np.argsort(-self.dft_variance)[: self.word_length_actual] # sort remaining indices self.support = np.sort(self.support) @@ -569,6 +593,36 @@ def _binning(self, X, y=None): self.dft_length = np.max(self.support) + 1 self.dft_length = self.dft_length + self.dft_length % 2 # even + # learn alphabet sizes + if self.learn_alphabet_sizes and self.alphabet_allocation_method: + if not hasattr(self, "dft_variance"): + self.dft_variance = np.var(dft, axis=0) + + symbols = np.log2(self.alphabet_size) + self.bit_budget = int(symbols * self.word_length) + + if self.alphabet_allocation_method == "linear_scale": + variance = self.dft_variance[self.support] + normed_scale = variance / variance.mean() + elif self.alphabet_allocation_method == "sqrt_scale": + variance = np.sqrt(self.dft_variance[self.support]) + normed_scale = variance / variance.mean() + elif self.alphabet_allocation_method == "log_scale": + variance = np.log2((self.dft_variance[self.support]) + 1) + normed_scale = variance / variance.mean() + + self.letter_bits = assign_bits_dynamically(normed_scale, self.bit_budget) + self.alphabet_sizes = [ + int(2 ** self.letter_bits[i]) for i in range(len(self.letter_bits)) + ] + else: + # use the same alphabet size for all positions + self.alphabet_sizes = [ + self.alphabet_size for _ in range(self.word_length_actual) + ] + + self.alphabet_sizes = np.array(self.alphabet_sizes) + if self.binning_method == "information-gain": return self._igb(dft, y) if self.binning_method == "information-gain-mae": @@ -597,30 +651,9 @@ def _k_bins_discretizer(self, dft): return breakpoints def _mcb(self, dft): - breakpoints = np.zeros((self.word_length_actual, self.alphabet_size)) - - dft = np.round(dft, 2) - for letter in range(self.word_length_actual): - column = np.sort(dft[:, letter]) - bin_index = 0 - - # use equi-depth binning - if self.binning_method == "equi-depth": - target_bin_depth = len(dft) / self.alphabet_size - - for bp in range(self.alphabet_size - 1): - bin_index += target_bin_depth - breakpoints[letter, bp] = column[int(bin_index)] - - # use equi-width binning aka equi-frequency binning - elif self.binning_method == "equi-width": - target_bin_width = (column[-1] - column[0]) / self.alphabet_size - - for bp in range(self.alphabet_size - 1): - breakpoints[letter, bp] = (bp + 1) * target_bin_width + column[0] - - breakpoints[:, self.alphabet_size - 1] = sys.float_info.max - return breakpoints + return mcb( + dft, self.alphabet_sizes, self.word_length_actual, self.binning_method + ) def _igb(self, dft, y): breakpoints = np.zeros((self.word_length_actual, self.alphabet_size)) @@ -695,7 +728,7 @@ def get_words(self): """ words = np.squeeze(self.words) return np.array( - [_get_chars(word, self.word_length, self.alphabet_size) for word in words] + [_get_chars(word, self.word_length, self.letter_bits) for word in words] ) def transform_words(self, X): @@ -723,7 +756,6 @@ def transform_words(self, X): self.inverse_sqrt_win_size, self.lower_bounding or self.lower_bounding_distances, self.word_length, - self.alphabet_size, self.breakpoints, ) @@ -784,22 +816,21 @@ def __setstate__(self, state): @njit(cache=True, fastmath=True) -def _get_chars(word, word_length, alphabet_size): +def _get_chars(word, word_length, letter_bits): chars = np.zeros(word_length, dtype=np.uint32) - letter_bits = int(np.log2(alphabet_size)) - mask = (1 << letter_bits) - 1 for i in range(word_length): # Extract the last bits + mask = (1 << letter_bits[i]) - 1 char = word & mask chars[-i - 1] = char # Right shift by to move to the next group of bits - word >>= letter_bits + word >>= letter_bits[i] return chars -@njit(fastmath=True, cache=True) +@njit(fastmath=True, cache=True, parallel=True) def _binning_dft( X, window_size, @@ -982,7 +1013,7 @@ def _get_phis(window_size, length): return phis -@njit(fastmath=True, cache=True) +@njit(fastmath=True, cache=True, parallel=True) def generate_words( dfts, bigrams, skip_grams, window_size, breakpoints, word_length, letter_bits ): @@ -995,9 +1026,7 @@ def generate_words( needed_size += max(0, 2 * dfts.shape[1] - 5 * window_size) words = np.zeros((dfts.shape[0], needed_size), dtype=np.uint32) - - letter_bits = np.uint32(letter_bits) - word_bits = word_length * letter_bits # dfts.shape[2] * letter_bits + word_bits = np.uint32(np.sum(letter_bits)) # special case: binary breakpoints if breakpoints.shape[1] == 2: @@ -1014,7 +1043,7 @@ def generate_words( for a in prange(dfts.shape[0]): for i in range(word_length): # range(dfts.shape[2]): words[a, : dfts.shape[1]] = ( - words[a, : dfts.shape[1]] << letter_bits + words[a, : dfts.shape[1]] << letter_bits[i] ) | np.digitize(dfts[a, :, i], breakpoints[i], right=True) # add bigrams @@ -1100,7 +1129,6 @@ def _mft( ) transformed2 = transformed2 * inverse_sqrt_win_size - if lower_bounding: transformed2[:, :, 1::2] = transformed2[:, :, 1::2] * -1 @@ -1175,7 +1203,7 @@ def create_feature_names(sfa_words): return feature_names -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True) # does not work with parallel=True ?? def create_bag_none( X_index, breakpoints, n_cases, sfa_words, word_length, remove_repeat_words ): @@ -1223,7 +1251,7 @@ def create_bag_feature_selection( return all_win_words, relevant_features -@njit(cache=True, fastmath=True) +@njit(cache=True, fastmath=True, parallel=True) def create_bag_transform( X_index, feature_count, @@ -1268,12 +1296,45 @@ def create_dict(feature_names, features_idx): return relevant_features -@njit(fastmath=True, cache=True) +@njit(fastmath=True, cache=True, parallel=True) +def mcb(dft, alphabet_sizes, word_length_actual, binning_method): + max_alphabet_size = np.max(alphabet_sizes) + + breakpoints = np.zeros((word_length_actual, max_alphabet_size), dtype=np.float32) + breakpoints[:, :] = np.finfo(np.float32).max + dft = np.round(dft, 2) + + for letter in prange(word_length_actual): + curr_alphabet_size = alphabet_sizes[letter] + column = np.sort(dft[:, letter]) + bin_index = 0 + + # use equi-depth binning + if binning_method == "equi-depth": + target_bin_depth = len(dft) / curr_alphabet_size + + for bp in range(curr_alphabet_size - 1): + bin_index += target_bin_depth + breakpoints[letter, bp] = column[int(bin_index)] + + # use equi-width binning aka equi-frequency binning + elif binning_method == "equi-width": + target_bin_width = (column[-1] - column[0]) / curr_alphabet_size + + for bp in range(curr_alphabet_size - 1): + breakpoints[letter, bp] = (bp + 1) * target_bin_width + column[0] + + return breakpoints + + +@njit(fastmath=True, cache=True, parallel=True) def shorten_words(words, amount, letter_bits): new_words = np.zeros((words.shape[0], words.shape[1]), dtype=np.uint32) # Unigrams - shift_len = amount * letter_bits + shift_len = np.sum( + letter_bits[:amount] + ) # this does not work for variable-alphabet size for j in prange(words.shape[1]): # shorten a word by set amount of letters new_words[:, j] = words[:, j] >> shift_len @@ -1300,7 +1361,6 @@ def _transform_words_case( inverse_sqrt_win_size, lower_bounding, word_length, - alphabet_size, breakpoints, ): dfts = _mft( @@ -1320,9 +1380,50 @@ def _transform_words_case( for x in prange(dfts.shape[0]): for window in prange(dfts.shape[1]): for i in prange(word_length): - for bp in range(alphabet_size): + for bp in range(breakpoints.shape[1]): if dfts[x, window, i] <= breakpoints[i][bp]: words[x, window, i] = bp break return words, dfts + + +@njit(fastmath=True, cache=True) +def assign_bits_dynamically(variance, budget, max_bit_val=9): + """Assign bits dynamically based on variance and budget. + + The goal is to maximize the variance covered by each symbol. + + Parameters + ---------- + variance : 1d numpy array + the variance for each position. + budget : float + the maximal number of bits to assign. + max_bit_val : int, optional, default=9 + the maximum number of bits that can be assigned to a single position. + + Returns + ------- + bit_array : 1d numpy array + the number of bits assigned to each position. + """ + bit_array = np.zeros(len(variance), dtype=np.uint32) + bit_array[:] = 0 + + improve = variance.copy() / 2.0 + + # assign bits to positions + current_sum = bit_array.sum() + while current_sum < budget: + best_pos = np.argmax(improve) + bit_array[best_pos] += 1 + current_sum += 1 + + # recalculate the improvement + improve[best_pos] = variance[best_pos] / (2 ** (bit_array[best_pos] + 1)) + + if bit_array[best_pos] == max_bit_val: + improve[best_pos] = 0 + + return bit_array diff --git a/aeon/transformations/collection/dictionary_based/_sfa_whole.py b/aeon/transformations/collection/dictionary_based/_sfa_whole.py index 2e9f3df86a..0f7e4aad2a 100644 --- a/aeon/transformations/collection/dictionary_based/_sfa_whole.py +++ b/aeon/transformations/collection/dictionary_based/_sfa_whole.py @@ -14,12 +14,14 @@ class SFAWhole(SFAFast): """Symbolic Fourier Approximation (SFA) Transformer. A whole series transform for SFA which holds the lower bounding lemma, see [1]. + SOFA [2] extends on SFA by introducing variance based feature selection, and dynamic + alphabet sizes. It produces a significantly tighter lower bound. It is implemented as a wrapper for the SFA-Fast transformer, the latter implements subsequence-based SFA extraction. This wrapper reduces non-needed parameters, and sets some usefull defaults for - lower bounding. + the tightest possible lower bounding. Parameters ---------- @@ -27,9 +29,12 @@ class SFAWhole(SFAFast): Length of word to shorten window to (using DFT). alphabet_size : int, default = 4 Number of values to discretise each value to. + alphabet_allocation_method : str, default = linear_scale + The method used to learn the dynamic alphabet sizes. One of + {"linear_scale", "log_scale", "sqrt_scale"}. norm : boolean, default = False Mean normalise words by dropping first fourier coefficient. - binning_method : str, default="equi-depth" + binning_method : str, default="equi-width" The binning method used to derive the breakpoints. One of {"equi-depth", "equi-width", "information-gain", "information-gain-mae", "kmeans", "quantile"}, variance : boolean, default = False @@ -51,9 +56,12 @@ class SFAWhole(SFAFast): References ---------- - .. [1] Schäfer, Patrick, and Mikael Högqvist. "SFA: a symbolic fourier approximation + .. [1] P. Schäfer, and M. Högqvist. "SFA: a symbolic fourier approximation and index for similarity search in high dimensional datasets." Proceedings of the 15th international conference on extending database technology. 2012. + .. [2] P. Schäfer, J. Brand, U. Leser, B. Peng and T. Palpanas, "Fast and Exact + Similarity Search in Less than a Blink of an Eye," in 2025 IEEE 41st International + Conference on Data Engineering (ICDE), Hong Kong, 2025, pp. 2464-2477. """ _tags = { @@ -66,8 +74,9 @@ def __init__( self, word_length=8, alphabet_size=4, + alphabet_allocation_method="linear_scale", norm=True, - binning_method="equi-depth", + binning_method="equi-width", variance=True, sampling_factor=None, random_state=None, @@ -77,6 +86,7 @@ def __init__( word_length=word_length, alphabet_size=alphabet_size, norm=norm, + alphabet_allocation_method=alphabet_allocation_method, binning_method=binning_method, variance=variance, sampling_factor=sampling_factor, @@ -92,7 +102,7 @@ def __init__( skip_grams=False, remove_repeat_words=False, return_sparse=False, - window_size=None, # set in fit + window_size=None, # set in fit - do not remove ) def _fit_transform(self, X, y=None):