8
8
import sys
9
9
import time
10
10
from copy import deepcopy
11
+ from typing import TYPE_CHECKING
11
12
12
13
import numpy as np
13
14
from sklearn .utils import check_random_state
14
15
16
+ from aeon .networks import BaseDeepLearningNetwork
15
17
from aeon .transformations .collection import BaseCollectionTransformer
18
+ from aeon .utils .self_supervised .general import z_normalization
19
+
20
+ if TYPE_CHECKING :
21
+ from tensorflow .keras .callbacks import Callback
22
+ from tensorflow .keras .optimizers import Optimizer
16
23
17
24
18
25
class TRILITE (BaseCollectionTransformer ):
@@ -32,39 +39,39 @@ class TRILITE(BaseCollectionTransformer):
32
39
33
40
Parameters
34
41
----------
35
- alpha: float, default = 1e-2
42
+ alpha : float, default = 1e-2
36
43
The value that controls the space of the triplet loss,
37
44
the smaller the value the more difficult the problem
38
45
becomes, the higher the value the more easy the problem
39
46
becomes, a balance should be found.
40
- weight_ref_min: float, default = 0.6
47
+ weight_ref_min : float, default = 0.6
41
48
The weight of the reference series used for the triplet
42
49
generation.
43
- percentage_mask_length: int, default = 0.2
50
+ percentage_mask_length : int, default = 0.2
44
51
The percentage of time series length to calculate
45
52
the length of the masking used for the triplet
46
53
generation. Default is 20%.
47
- use_mixing_up: bool, default = True
54
+ use_mixing_up : bool, default = True
48
55
Wether or not to use mixing up during the triplet
49
56
generation phase.
50
- use_masking: bool, default = True
57
+ use_masking : bool, default = True
51
58
Whether or not to use masking during the triplet
52
59
generation phase.
53
- znormalize_pos_neg : bool, default = True
54
- Whether or not to znormalize (mean 0 and std 1)
60
+ z_normalize_pos_neg : bool, default = True
61
+ Whether or not to z_normalize (mean 0 and std 1)
55
62
pos and neg samples after generating the triplet.
56
- backbone_network: aeon Network, default = None
63
+ backbone_network : aeon Network, default = None
57
64
The backbone network used for the SSL model,
58
65
it can be any network from the aeon.networks
59
66
module on condition for it's structure to be
60
67
configured as "encoder", see _config attribute.
61
68
For TRILITE, the default network used is
62
69
FCNNetwork.
63
- latent_space_dim: int, default = 128
70
+ latent_space_dim : int, default = 128
64
71
The size of the latent space, applied using a
65
72
fully connected layer at the end of the network's
66
73
output.
67
- latent_space_activation: str, default = "linear"
74
+ latent_space_activation : str, default = "linear"
68
75
The activation to control the range of values
69
76
of the latent space.
70
77
random_state : int, RandomState instance or None, default=None
@@ -151,36 +158,36 @@ class TRILITE(BaseCollectionTransformer):
151
158
152
159
def __init__ (
153
160
self ,
154
- alpha = 1e-2 ,
155
- weight_ref_min = 0.6 ,
156
- percentage_mask_length = 0.2 ,
157
- use_mixing_up = True ,
158
- use_masking = True ,
159
- znormalize_pos_neg = True ,
160
- backbone_network = None ,
161
- latent_space_dim = 128 ,
162
- latent_space_activation = "linear" ,
163
- random_state = None ,
164
- verbose = False ,
165
- optimizer = None ,
166
- file_path = "./" ,
167
- save_best_model = False ,
168
- save_last_model = False ,
169
- save_init_model = False ,
170
- best_file_name = "best_model" ,
171
- last_file_name = "last_model" ,
172
- init_file_name = "init_model" ,
173
- callbacks = None ,
174
- batch_size = 64 ,
175
- use_mini_batch_size = False ,
176
- n_epochs = 2000 ,
161
+ alpha : float = 1e-2 ,
162
+ weight_ref_min : float = 0.6 ,
163
+ percentage_mask_length : float = 0.2 ,
164
+ use_mixing_up : bool = True ,
165
+ use_masking : bool = True ,
166
+ z_normalize_pos_neg : bool = True ,
167
+ backbone_network : BaseDeepLearningNetwork = None ,
168
+ latent_space_dim : int = 128 ,
169
+ latent_space_activation : str = "linear" ,
170
+ random_state : int | np . random . RandomState | None = None ,
171
+ verbose : bool = False ,
172
+ optimizer : Optimizer | None = None ,
173
+ file_path : str = "./" ,
174
+ save_best_model : bool = False ,
175
+ save_last_model : bool = False ,
176
+ save_init_model : bool = False ,
177
+ best_file_name : str = "best_model" ,
178
+ last_file_name : str = "last_model" ,
179
+ init_file_name : str = "init_model" ,
180
+ callbacks : Callback | list [ Callback ] | None = None ,
181
+ batch_size : int = 64 ,
182
+ use_mini_batch_size : bool = False ,
183
+ n_epochs : int = 2000 ,
177
184
):
178
185
self .alpha = alpha
179
186
self .weight_ref_min = weight_ref_min
180
187
self .percentage_mask_length = percentage_mask_length
181
188
self .use_mixing_up = use_mixing_up
182
189
self .use_masking = use_masking
183
- self .znormalize_pos_neg = znormalize_pos_neg
190
+ self .z_normalize_pos_neg = z_normalize_pos_neg
184
191
self .backbone_network = backbone_network
185
192
self .latent_space_dim = latent_space_dim
186
193
self .latent_space_activation = latent_space_activation
@@ -201,7 +208,7 @@ def __init__(
201
208
202
209
super ().__init__ ()
203
210
204
- def _fit (self , X , y = None ):
211
+ def _fit (self , X : np . ndarray , y = None ):
205
212
"""Fit the SSL model on X, y is ignored.
206
213
207
214
Parameters
@@ -216,7 +223,7 @@ def _fit(self, X, y=None):
216
223
"""
217
224
import tensorflow as tf
218
225
219
- from aeon .networks import BaseDeepLearningNetwork , FCNNetwork
226
+ from aeon .networks import FCNNetwork
220
227
221
228
if isinstance (self .backbone_network , BaseDeepLearningNetwork ):
222
229
self ._backbone_network = deepcopy (self .backbone_network )
@@ -374,7 +381,7 @@ def build_model(self, input_shape):
374
381
375
382
Parameters
376
383
----------
377
- input_shape : tuple
384
+ input_shape : tuple[int, int]
378
385
The shape of the data fed into the input layer, should be (m, d).
379
386
380
387
Returns
@@ -453,32 +460,41 @@ def _triplet_generation(self, X):
453
460
n_channels = int (X .shape [- 1 ])
454
461
length_TS = int (X .shape [1 ])
455
462
463
+ # define mask length
456
464
self .mask_length = int (length_TS * self .percentage_mask_length )
457
465
466
+ # define weight for each sample in the mixing up
458
467
w_ref = np .random .choice (
459
468
np .linspace (start = self .weight_ref_min , stop = 1 , num = 1000 ), size = 1
460
469
)
461
470
w_ts = (1 - w_ref ) / 2
462
471
472
+ # define your ref as random permutation of X
463
473
ref = np .random .permutation (X [:])
464
474
465
475
n = int (ref .shape [0 ])
466
476
477
+ # define positive and negative sample arrays
467
478
_pos = np .zeros (shape = ref .shape )
468
479
_neg = np .zeros (shape = ref .shape )
469
480
470
481
all_indices = np .arange (start = 0 , stop = n )
471
482
472
483
for i_ref in range (n ):
484
+ # remove the sample ref from the random choice of pos-neg
473
485
all_indices_without_ref = np .delete (arr = all_indices , obj = i_ref )
486
+
487
+ # choose a random sample used for the negative generation
474
488
index_neg = int (np .random .choice (all_indices_without_ref , size = 1 ))
475
489
476
490
_ref = ref [i_ref ].copy ()
477
491
492
+ # remove the index_neg from choices
478
493
all_indices_without_ref_and_not_ref = np .delete (
479
494
arr = all_indices , obj = [i_ref , index_neg ]
480
495
)
481
496
497
+ # choose samples used for the mixing up
482
498
index_ts1_pos = int (
483
499
np .random .choice (all_indices_without_ref_and_not_ref , size = 1 )
484
500
)
@@ -504,9 +520,12 @@ def _triplet_generation(self, X):
504
520
# MixingUp
505
521
506
522
if self .use_mixing_up and self .use_masking :
523
+ # mix up the selected series with ref to obtain pos
507
524
_pos [i_ref ] = w_ref * _ref + w_ts * _ts1_pos + w_ts * _ts2_pos
525
+ # mix up the selected series with neg ref to obtain neg
508
526
_neg [i_ref ] = w_ref * _not_ref + w_ts * _ts1_neg + w_ts * _ts2_neg
509
527
528
+ # apply masking
510
529
_pos [i_ref ], _neg [i_ref ] = self ._apply_masking (
511
530
pos = _pos [i_ref ],
512
531
neg = _neg [i_ref ],
@@ -516,10 +535,13 @@ def _triplet_generation(self, X):
516
535
)
517
536
518
537
elif self .use_mixing_up and not self .use_masking :
538
+ # mix up the selected series with ref to obtain pos
519
539
_pos [i_ref ] = w_ref * _ref + w_ts * _ts1_pos + w_ts * _ts2_pos
540
+ # mix up the selected series with neg ref to obtain neg
520
541
_neg [i_ref ] = w_ref * _not_ref + w_ts * _ts1_neg + w_ts * _ts2_neg
521
542
522
543
elif self .use_masking and not self .use_mixing_up :
544
+ # apply masking
523
545
_pos [i_ref ], _neg [i_ref ] = self ._apply_masking (
524
546
pos = _pos [i_ref ],
525
547
neg = _neg [i_ref ],
@@ -534,29 +556,38 @@ def _triplet_generation(self, X):
534
556
"should be chosen to generate" ,
535
557
"the triplets." ,
536
558
)
537
- if self .znormalize_pos_neg :
538
- _pos_normalized = self ._znormalization (_pos )
539
- _neg_normalized = self ._znormalization (_neg )
559
+ if self .z_normalize_pos_neg :
560
+ # z_normalize pos and neg
561
+ _pos_normalized = z_normalization (_pos )
562
+ _neg_normalized = z_normalization (_neg )
540
563
541
564
return ref , _pos_normalized , _neg_normalized
542
565
else :
543
566
return ref , _pos , _neg
544
567
545
568
def _apply_masking (self , pos , neg , n_channels , length_TS , mask_length ):
546
569
"""Apply masking phase on pos and neg."""
570
+ # select a random start for the mask
547
571
start_mask = int (np .random .randint (low = 0 , high = length_TS - mask_length , size = 1 ))
548
572
stop_mask = start_mask + mask_length
549
573
574
+ # define noise on replacement on the left side of the mask
550
575
noise_pos_left = np .random .random (size = (start_mask , n_channels ))
576
+ # normalize noise
551
577
noise_pos_left /= 5
552
578
noise_pos_left -= 0.1
579
+
580
+ # define noise on replacement on the left side of the mask
553
581
noise_pos_right = np .random .random (size = (length_TS - stop_mask , n_channels ))
582
+ # normalize noise
554
583
noise_pos_right /= 5
555
584
noise_pos_right -= 0.1
556
585
586
+ # replace left and right side of the mask by normalized noise
557
587
pos [0 :start_mask , :] = noise_pos_left
558
588
pos [stop_mask :length_TS , :] = noise_pos_right
559
589
590
+ # repeat the same procedure for the negative sample
560
591
noise_neg_left = np .random .random (size = (start_mask , n_channels ))
561
592
noise_neg_left /= 5
562
593
noise_neg_left -= 0.1
@@ -569,13 +600,6 @@ def _apply_masking(self, pos, neg, n_channels, length_TS, mask_length):
569
600
570
601
return pos , neg
571
602
572
- def _znormalization (self , X ):
573
- stds = np .std (X , axis = 1 , keepdims = True )
574
- if len (stds [stds == 0.0 ]) > 0 :
575
- stds [stds == 0.0 ] = 1.0
576
- return (X - X .mean (axis = 1 , keepdims = True )) / stds
577
- return (X - X .mean (axis = 1 , keepdims = True )) / (X .std (axis = 1 , keepdims = True ))
578
-
579
603
def save_last_model_to_file (self , file_path = "./" ):
580
604
"""Save the last epoch of the trained deep learning model.
581
605
0 commit comments