5
5
6
6
7
7
import numpy as np
8
+ import pandas as pd
9
+ from pathlib import Path
10
+
8
11
from ads .opctl import logger
9
- from ads .opctl .operator .lowcode .common .utils import (
10
- find_output_dirname ,
11
- )
12
12
from ads .opctl .operator .lowcode .common .const import DataColumns
13
13
from .model .forecast_datasets import ForecastDatasets
14
14
from .operator_config import ForecastOperatorConfig
15
- from pathlib import Path
16
- import pandas as pd
15
+
17
16
18
17
class ModelEvaluator :
19
18
def __init__ (self , models , k = 5 , subsample_ratio = 0.20 ):
@@ -31,7 +30,9 @@ def generate_cutoffs(self, unique_dates, horizon):
31
30
cut_offs = sorted_dates [- horizon - 1 :- horizon * (self .k + 1 ):- horizon ][:len (valid_train_window_size )]
32
31
return cut_offs
33
32
34
- def generate_k_fold_data (self , datasets : ForecastDatasets , date_col : str , horizon : int ):
33
+ def generate_k_fold_data (self , datasets : ForecastDatasets , operator_config : ForecastOperatorConfig ):
34
+ date_col = operator_config .spec .datetime_column .name
35
+ horizon = operator_config .spec .horizon
35
36
historical_data = datasets .historical_data .data .reset_index ()
36
37
series_col = DataColumns .Series
37
38
group_counts = historical_data [series_col ].value_counts ()
@@ -51,63 +52,80 @@ def generate_k_fold_data(self, datasets: ForecastDatasets, date_col: str, horizo
51
52
for i , current in enumerate (cut_offs [1 :]):
52
53
test_datasets .append (sampled_historical_data [(current < sampled_historical_data [date_col ]) & (
53
54
sampled_historical_data [date_col ] <= cut_offs [i ])])
54
- return cut_offs , training_datasets , test_datasets
55
+ all_additional = datasets .additional_data .data .reset_index ()
56
+ sampled_additional_data = all_additional [all_additional [series_col ].isin (sampled_groups .index )]
57
+ max_historical_date = sampled_historical_data [date_col ].max ()
58
+ additional_data = [sampled_additional_data [sampled_additional_data [date_col ] <= max_historical_date ]]
59
+ for cut_off in cut_offs [:- 1 ]:
60
+ trimmed_additional_data = sampled_additional_data [sampled_additional_data [date_col ] <= cut_off ]
61
+ additional_data .append (trimmed_additional_data )
62
+ return cut_offs , training_datasets , additional_data , test_datasets
55
63
56
64
def remove_none_values (self , obj ):
57
65
if isinstance (obj , dict ):
58
66
return {k : self .remove_none_values (v ) for k , v in obj .items () if k is not None and v is not None }
59
67
else :
60
68
return obj
61
69
62
- def create_operator_config (self , operator_config , backtest , model , historical_data , test_data ):
63
- output_dir = find_output_dirname ( operator_config .spec .output_directory )
64
- output_file_path = f'{ output_dir } back_testing/{ model } /{ backtest } '
70
+ def create_operator_config (self , operator_config , backtest , model , historical_data , additional_data , test_data ):
71
+ output_dir = operator_config .spec .output_directory . url
72
+ output_file_path = f'{ output_dir } / back_testing/{ model } /{ backtest } '
65
73
Path (output_file_path ).mkdir (parents = True , exist_ok = True )
66
74
historical_data_url = f'{ output_file_path } /historical.csv'
75
+ additional_data_url = f'{ output_file_path } /additional.csv'
67
76
test_data_url = f'{ output_file_path } /test.csv'
68
77
historical_data .to_csv (historical_data_url , index = False )
78
+ additional_data .to_csv (additional_data_url , index = False )
69
79
test_data .to_csv (test_data_url , index = False )
70
80
backtest_op_config_draft = operator_config .to_dict ()
71
81
backtest_spec = backtest_op_config_draft ["spec" ]
72
82
backtest_spec ["historical_data" ]["url" ] = historical_data_url
83
+ backtest_spec ["additional_data" ]["url" ] = additional_data_url
73
84
backtest_spec ["test_data" ]["url" ] = test_data_url
74
85
backtest_spec ["model" ] = model
75
- backtest_spec ["output_directory" ][ "url" ] = output_file_path
86
+ backtest_spec ["output_directory" ] = { "url" : output_file_path }
76
87
backtest_spec ["target_category_columns" ] = [DataColumns .Series ]
77
- backtest_spec . pop ( 'additional_data' , None ) # todo create additional data
88
+ backtest_spec [ 'generate_explanations' ] = False
78
89
cleaned_config = self .remove_none_values (backtest_op_config_draft )
79
90
80
91
backtest_op_config = ForecastOperatorConfig .from_dict (
81
92
obj_dict = cleaned_config )
82
93
return backtest_op_config
83
94
84
95
def run_all_models (self , datasets : ForecastDatasets , operator_config : ForecastOperatorConfig ):
85
- date_col = operator_config .spec .datetime_column .name
86
- horizon = operator_config .spec .horizon
87
- cut_offs , train_sets , test_sets = self .generate_k_fold_data (datasets , date_col , horizon )
96
+ cut_offs , train_sets , additional_data , test_sets = self .generate_k_fold_data (datasets , operator_config )
88
97
metrics = {}
89
98
for model in self .models :
90
99
from .model .factory import ForecastOperatorModelFactory
91
100
metrics [model ] = {}
92
101
for i in range (len (cut_offs )):
93
102
backtest_historical_data = train_sets [i ]
103
+ backtest_additional_data = additional_data [i ]
94
104
backtest_test_data = test_sets [i ]
95
105
backtest_operator_config = self .create_operator_config (operator_config , i , model ,
96
106
backtest_historical_data ,
107
+ backtest_additional_data ,
97
108
backtest_test_data )
98
109
datasets = ForecastDatasets (backtest_operator_config )
99
110
ForecastOperatorModelFactory .get_model (
100
111
backtest_operator_config , datasets
101
112
).generate_report ()
102
- metrics_df = pd .read_csv (f"{ backtest_operator_config .spec .output_directory .url } /metrics.csv" )
103
- metrics_df ["average_accross_series" ] = metrics_df .drop ('metrics' , axis = 1 ).mean (axis = 1 )
104
- metrics_average_dict = dict (zip (metrics_df ['metrics' ].str .lower (), metrics_df ['average_accross_series' ]))
113
+ test_metrics_filename = backtest_operator_config .spec .test_metrics_filename
114
+ metrics_df = pd .read_csv (
115
+ f"{ backtest_operator_config .spec .output_directory .url } /{ test_metrics_filename } " )
116
+ metrics_df ["average_across_series" ] = metrics_df .drop ('metrics' , axis = 1 ).mean (axis = 1 )
117
+ metrics_average_dict = dict (zip (metrics_df ['metrics' ].str .lower (), metrics_df ['average_across_series' ]))
105
118
metrics [model ][i ] = metrics_average_dict [operator_config .spec .metric ]
106
119
return metrics
107
120
108
121
def find_best_model (self , datasets : ForecastDatasets , operator_config : ForecastOperatorConfig ):
109
122
metrics = self .run_all_models (datasets , operator_config )
110
- avg_backtests_metrics = {key : sum (value .values ()) / len (value .values ()) for key , value in metrics .items ()}
123
+ avg_backtests_metrics = {key : sum (value .values ()) / len (value .values ()) for key , value in metrics .items ()}
111
124
best_model = min (avg_backtests_metrics , key = avg_backtests_metrics .get )
112
125
logger .info (f"Among models { self .models } , { best_model } model shows better performance during backtesting." )
126
+ backtest_stats = pd .DataFrame (metrics ).rename_axis ('backtest' )
127
+ backtest_stats .reset_index (inplace = True )
128
+ output_dir = operator_config .spec .output_directory .url
129
+ backtest_report_name = "backtest_stats.csv"
130
+ backtest_stats .to_csv (f"{ output_dir } /{ backtest_report_name } " , index = False )
113
131
return best_model
0 commit comments