From 1b9337b5ac02caca99452ecfe3083c002b87427d Mon Sep 17 00:00:00 2001 From: Allen Hosler Date: Thu, 20 Feb 2025 17:29:18 +0000 Subject: [PATCH 01/18] Adding automlx to model list --- docs/source/user_guide/operators/forecast_operator/index.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/user_guide/operators/forecast_operator/index.rst b/docs/source/user_guide/operators/forecast_operator/index.rst index 60993863f..09b4c69f9 100644 --- a/docs/source/user_guide/operators/forecast_operator/index.rst +++ b/docs/source/user_guide/operators/forecast_operator/index.rst @@ -31,6 +31,7 @@ There is no perfect model. A core feature of the Operator is the ability to sele - **Prophet** - **ARIMA** +- **AutoMLx** - **MLForecast** - **NeuralProphet** - **AutoTS** From bac5918ff8be74a1aa9e0a9997179e0a5b10ac29 Mon Sep 17 00:00:00 2001 From: Allen Date: Fri, 14 Mar 2025 21:15:46 +0000 Subject: [PATCH 02/18] add support for min and max n prophet --- .../lowcode/anomaly/model/base_model.py | 6 +- .../lowcode/anomaly/model/randomcutforest.py | 2 +- ads/opctl/operator/lowcode/anomaly/utils.py | 2 +- .../lowcode/common/transformations.py | 6 +- ads/opctl/operator/lowcode/common/utils.py | 4 +- .../operator/lowcode/forecast/model/arima.py | 20 +- .../lowcode/forecast/model/automlx.py | 14 +- .../operator/lowcode/forecast/model/autots.py | 12 +- .../lowcode/forecast/model/base_model.py | 28 ++- .../forecast/model/forecast_datasets.py | 20 +- .../lowcode/forecast/model/ml_forecast.py | 4 +- .../lowcode/forecast/model/neuralprophet.py | 22 +- .../lowcode/forecast/model/prophet.py | 33 ++- .../lowcode/forecast/model_evaluator.py | 190 +++++++++++++----- .../whatifserve/deployment_manager.py | 161 +++++++++------ .../operators/forecast_operator/index.rst | 23 +++ tests/operators/forecast/test_errors.py | 54 ++++- 17 files changed, 418 insertions(+), 183 deletions(-) diff --git a/ads/opctl/operator/lowcode/anomaly/model/base_model.py b/ads/opctl/operator/lowcode/anomaly/model/base_model.py index 5ee1ca36f..4f28bb8d2 100644 --- a/ads/opctl/operator/lowcode/anomaly/model/base_model.py +++ b/ads/opctl/operator/lowcode/anomaly/model/base_model.py @@ -71,7 +71,7 @@ def generate_report(self): try: anomaly_output = self._build_model() except Exception as e: - logger.warn(f"Found exception: {e}") + logger.warning(f"Found exception: {e}") if self.spec.datetime_column: anomaly_output = self._fallback_build_model() raise e @@ -347,7 +347,7 @@ def _save_report( storage_options=storage_options, ) - logger.warn( + logger.warning( f"The report has been successfully " f"generated and placed to the: {unique_output_dir}." ) @@ -356,7 +356,7 @@ def _fallback_build_model(self): """ Fallback method for the sub model _build_model method. """ - logger.warn( + logger.warning( f"The build_model method has failed for the model: {self.spec.model}. " "A fallback model will be built." ) diff --git a/ads/opctl/operator/lowcode/anomaly/model/randomcutforest.py b/ads/opctl/operator/lowcode/anomaly/model/randomcutforest.py index ad34159ab..99903953a 100644 --- a/ads/opctl/operator/lowcode/anomaly/model/randomcutforest.py +++ b/ads/opctl/operator/lowcode/anomaly/model/randomcutforest.py @@ -95,7 +95,7 @@ def _build_model(self) -> AnomalyOutput: anomaly_output.add_output(target, anomaly, score) except Exception as e: - logger.warn(f"Encountered Error: {e}. Skipping series {target}.") + logger.warning(f"Encountered Error: {e}. Skipping series {target}.") return anomaly_output diff --git a/ads/opctl/operator/lowcode/anomaly/utils.py b/ads/opctl/operator/lowcode/anomaly/utils.py index 902e7f186..396d34c57 100644 --- a/ads/opctl/operator/lowcode/anomaly/utils.py +++ b/ads/opctl/operator/lowcode/anomaly/utils.py @@ -44,7 +44,7 @@ def _build_metrics_df(y_true, y_pred, column_name): # Throws exception if y_true has only one class metrics[SupportedMetrics.ROC_AUC] = roc_auc_score(y_true, y_pred) except Exception as e: - logger.warn(f"An exception occurred: {e}") + logger.warning(f"An exception occurred: {e}") metrics[SupportedMetrics.ROC_AUC] = None precision, recall, thresholds = precision_recall_curve(y_true, y_pred) metrics[SupportedMetrics.PRC_AUC] = auc(recall, precision) diff --git a/ads/opctl/operator/lowcode/common/transformations.py b/ads/opctl/operator/lowcode/common/transformations.py index ca3a05d83..66deb6546 100644 --- a/ads/opctl/operator/lowcode/common/transformations.py +++ b/ads/opctl/operator/lowcode/common/transformations.py @@ -98,7 +98,11 @@ def run(self, data): return clean_df def _remove_trailing_whitespace(self, df): - return df.apply(lambda x: x.str.strip() if x.dtype == "object" else x) + return df.apply( + lambda x: x.str.strip() + if hasattr(x, "dtype") and x.dtype == "object" + else x + ) def _clean_column_names(self, df): """ diff --git a/ads/opctl/operator/lowcode/common/utils.py b/ads/opctl/operator/lowcode/common/utils.py index b0193bfd0..818dccb5d 100644 --- a/ads/opctl/operator/lowcode/common/utils.py +++ b/ads/opctl/operator/lowcode/common/utils.py @@ -3,6 +3,7 @@ # Copyright (c) 2024, 2025 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ +import json import logging import os import shutil @@ -12,7 +13,6 @@ import fsspec import oracledb -import json import pandas as pd from ads.common.object_storage_details import ObjectStorageDetails @@ -265,7 +265,7 @@ def find_output_dirname(output_dir: OutputDirectory): while os.path.exists(unique_output_dir): unique_output_dir = f"{output_dir}_{counter}" counter += 1 - logger.warn( + logger.warning( f"Since the output directory was not specified, the output will be saved to {unique_output_dir} directory." ) return unique_output_dir diff --git a/ads/opctl/operator/lowcode/forecast/model/arima.py b/ads/opctl/operator/lowcode/forecast/model/arima.py index f3bd57657..6639ba097 100644 --- a/ads/opctl/operator/lowcode/forecast/model/arima.py +++ b/ads/opctl/operator/lowcode/forecast/model/arima.py @@ -137,8 +137,8 @@ def _train_model(self, i, s_id, df, model_kwargs): "error": str(e), "error_trace": traceback.format_exc(), } - logger.warn(f"Encountered Error: {e}. Skipping.") - logger.warn(traceback.format_exc()) + logger.warning(f"Encountered Error: {e}. Skipping.") + logger.warning(traceback.format_exc()) def _build_model(self) -> pd.DataFrame: full_data_dict = self.datasets.get_data_by_series() @@ -166,7 +166,7 @@ def _generate_report(self): sec5_text = rc.Heading("ARIMA Model Parameters", level=2) blocks = [ rc.Html( - m['model'].summary().as_html(), + m["model"].summary().as_html(), label=s_id if self.target_cat_col else None, ) for i, (s_id, m) in enumerate(self.models.items()) @@ -201,11 +201,15 @@ def _generate_report(self): self.formatted_local_explanation = aggregate_local_explanations if not self.target_cat_col: - self.formatted_global_explanation = self.formatted_global_explanation.rename( - {"Series 1": self.original_target_column}, - axis=1, + self.formatted_global_explanation = ( + self.formatted_global_explanation.rename( + {"Series 1": self.original_target_column}, + axis=1, + ) + ) + self.formatted_local_explanation.drop( + "Series", axis=1, inplace=True ) - self.formatted_local_explanation.drop("Series", axis=1, inplace=True) # Create a markdown section for the global explainability global_explanation_section = rc.Block( @@ -235,7 +239,7 @@ def _generate_report(self): local_explanation_section, ] except Exception as e: - logger.warn(f"Failed to generate Explanations with error: {e}.") + logger.warning(f"Failed to generate Explanations with error: {e}.") logger.debug(f"Full Traceback: {traceback.format_exc()}") model_description = rc.Text( diff --git a/ads/opctl/operator/lowcode/forecast/model/automlx.py b/ads/opctl/operator/lowcode/forecast/model/automlx.py index 8d0d4e4be..4628e12e7 100644 --- a/ads/opctl/operator/lowcode/forecast/model/automlx.py +++ b/ads/opctl/operator/lowcode/forecast/model/automlx.py @@ -189,8 +189,8 @@ def _build_model(self) -> pd.DataFrame: "error": str(e), "error_trace": traceback.format_exc(), } - logger.warn(f"Encountered Error: {e}. Skipping.") - logger.warn(traceback.format_exc()) + logger.warning(f"Encountered Error: {e}. Skipping.") + logger.warning(traceback.format_exc()) logger.debug("===========Forecast Generated===========") @@ -257,7 +257,9 @@ def _generate_report(self): ) self.formatted_global_explanation.rename( - columns={self.spec.datetime_column.name: ForecastOutputColumns.DATE}, + columns={ + self.spec.datetime_column.name: ForecastOutputColumns.DATE + }, inplace=True, ) @@ -312,7 +314,7 @@ def _generate_report(self): local_explanation_section, ] except Exception as e: - logger.warn(f"Failed to generate Explanations with error: {e}.") + logger.warning(f"Failed to generate Explanations with error: {e}.") logger.debug(f"Full Traceback: {traceback.format_exc()}") model_description = rc.Text( @@ -478,7 +480,9 @@ def explain_model(self): except Exception as e: if s_id in self.errors_dict: self.errors_dict[s_id]["explainer_error"] = str(e) - self.errors_dict[s_id]["explainer_error_trace"] = traceback.format_exc() + self.errors_dict[s_id]["explainer_error_trace"] = ( + traceback.format_exc() + ) else: self.errors_dict[s_id] = { "model_name": self.spec.model, diff --git a/ads/opctl/operator/lowcode/forecast/model/autots.py b/ads/opctl/operator/lowcode/forecast/model/autots.py index 16e69c356..65af79cc5 100644 --- a/ads/opctl/operator/lowcode/forecast/model/autots.py +++ b/ads/opctl/operator/lowcode/forecast/model/autots.py @@ -211,8 +211,8 @@ def _build_model(self) -> pd.DataFrame: "error": str(e), "error_trace": traceback.format_exc(), } - logger.warn(f"Encountered Error: {e}. Skipping.") - logger.warn(traceback.format_exc()) + logger.warning(f"Encountered Error: {e}. Skipping.") + logger.warning(traceback.format_exc()) logger.debug("===========Done===========") @@ -242,7 +242,7 @@ def _generate_report(self) -> tuple: self.models.df_wide_numeric, series=s_id ), self.datasets.list_series_ids(), - target_category_column=self.target_cat_col + target_category_column=self.target_cat_col, ) section_1 = rc.Block( rc.Heading("Forecast Overview", level=2), @@ -260,7 +260,9 @@ def _generate_report(self) -> tuple: ) except KeyError: - logger.warn("Issue generating Model Parameters Table Section. Skipping") + logger.warning( + "Issue generating Model Parameters Table Section. Skipping" + ) sec2 = rc.Text("Error generating model parameters.") section_2 = rc.Block(sec2_text, sec2) @@ -268,7 +270,7 @@ def _generate_report(self) -> tuple: all_sections = [section_1, section_2] if self.spec.generate_explanations: - logger.warn("Explanations not yet supported for the AutoTS Module") + logger.warning("Explanations not yet supported for the AutoTS Module") # Model Description model_description = rc.Text( diff --git a/ads/opctl/operator/lowcode/forecast/model/base_model.py b/ads/opctl/operator/lowcode/forecast/model/base_model.py index 6238bc5f5..ffa5e9614 100644 --- a/ads/opctl/operator/lowcode/forecast/model/base_model.py +++ b/ads/opctl/operator/lowcode/forecast/model/base_model.py @@ -29,7 +29,6 @@ seconds_to_datetime, write_data, ) -from ads.opctl.operator.lowcode.forecast.model.forecast_datasets import TestData from ads.opctl.operator.lowcode.forecast.utils import ( _build_metrics_df, _build_metrics_per_horizon, @@ -132,11 +131,10 @@ def generate_report(self): if self.datasets.test_data is not None: try: - ( - self.test_eval_metrics, - summary_metrics - ) = self._test_evaluate_metrics( - elapsed_time=elapsed_time, + (self.test_eval_metrics, summary_metrics) = ( + self._test_evaluate_metrics( + elapsed_time=elapsed_time, + ) ) if not self.target_cat_col: self.test_eval_metrics.rename( @@ -145,7 +143,7 @@ def generate_report(self): inplace=True, ) except Exception: - logger.warn("Unable to generate Test Metrics.") + logger.warning("Unable to generate Test Metrics.") logger.debug(f"Full Traceback: {traceback.format_exc()}") report_sections = [] @@ -369,7 +367,7 @@ def _test_evaluate_metrics(self, elapsed_time=0): -self.spec.horizon : ] except KeyError as ke: - logger.warn( + logger.warning( f"Error Generating Metrics: Unable to find {s_id} in the test data. Error: {ke.args}" ) y_pred = self.forecast_output.get_forecast(s_id)["forecast_value"].values[ @@ -542,7 +540,7 @@ def _save_report( ) results.set_metrics(metrics_df_formatted) else: - logger.warn( + logger.warning( f"Attempted to generate the {self.spec.metrics_filename} file with the training metrics, however the training metrics could not be properly generated." ) @@ -563,7 +561,7 @@ def _save_report( ) results.set_test_metrics(test_metrics_df_formatted) else: - logger.warn( + logger.warning( f"Attempted to generate the {self.spec.test_metrics_filename} file with the test metrics, however the test metrics could not be properly generated." ) # explanations csv reports @@ -581,7 +579,7 @@ def _save_report( ) results.set_global_explanations(self.formatted_global_explanation) else: - logger.warn( + logger.warning( f"Attempted to generate global explanations for the {self.spec.global_explanation_filename} file, but an issue occured in formatting the explanations." ) @@ -597,11 +595,11 @@ def _save_report( ) results.set_local_explanations(self.formatted_local_explanation) else: - logger.warn( + logger.warning( f"Attempted to generate local explanations for the {self.spec.local_explanation_filename} file, but an issue occured in formatting the explanations." ) except AttributeError as e: - logger.warn( + logger.warning( "Unable to generate explanations for this model type or for this dataset." ) logger.debug(f"Got error: {e.args}") @@ -769,7 +767,7 @@ def explain_model(self): local_ex_time = local_ex_time + time.time() - exp_end_time if not len(kernel_explnr_vals): - logger.warn( + logger.warning( "No explanations generated. Ensure that additional data has been provided." ) else: @@ -780,7 +778,7 @@ def explain_model(self): ) ) else: - logger.warn( + logger.warning( f"Skipping explanations for {s_id}, as forecast was not generated." ) diff --git a/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py b/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py index 4364e5cd9..342721c83 100644 --- a/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py +++ b/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py @@ -23,14 +23,14 @@ class HistoricalData(AbstractData): - def __init__(self, spec, historical_data = None): + def __init__(self, spec, historical_data=None): super().__init__(spec=spec, name="historical_data", data=historical_data) def _ingest_data(self, spec): try: self.freq = get_frequency_of_datetime(self.data.index.get_level_values(0)) except TypeError as e: - logger.warn( + logger.warning( f"Error determining frequency: {e.args}. Setting Frequency to None" ) logger.debug(f"Full traceback: {e}") @@ -106,7 +106,7 @@ def _ingest_data(self, spec): _spec = spec self.additional_regressors = list(self.data.columns) if not self.additional_regressors: - logger.warn( + logger.warning( f"No additional variables found in the additional_data. Only columns found: {self.data.columns}. Skipping for now." ) # Check that datetime column matches historical datetime column @@ -121,7 +121,13 @@ def __init__(self, spec, test_data): class ForecastDatasets: - def __init__(self, config: ForecastOperatorConfig, historical_data=None, additional_data=None, test_data=None): + def __init__( + self, + config: ForecastOperatorConfig, + historical_data=None, + additional_data=None, + test_data=None, + ): """Instantiates the DataIO instance. Properties @@ -136,7 +142,9 @@ def __init__(self, config: ForecastOperatorConfig, historical_data=None, additio self._target_col = config.spec.target_column if historical_data is not None: self.historical_data = HistoricalData(config.spec, historical_data) - self.additional_data = AdditionalData(config.spec, self.historical_data, additional_data) + self.additional_data = AdditionalData( + config.spec, self.historical_data, additional_data + ) else: self._load_data(config.spec) self.test_data = TestData(config.spec, test_data) @@ -147,7 +155,7 @@ def _load_data(self, spec): self.additional_data = AdditionalData(spec, self.historical_data) if spec.generate_explanations and spec.additional_data is None: - logger.warn( + logger.warning( "Unable to generate explanations as there is no additional data passed in. Either set generate_explanations to False, or pass in additional data." ) spec.generate_explanations = False diff --git a/ads/opctl/operator/lowcode/forecast/model/ml_forecast.py b/ads/opctl/operator/lowcode/forecast/model/ml_forecast.py index 1911ebf0c..05c0dd606 100644 --- a/ads/opctl/operator/lowcode/forecast/model/ml_forecast.py +++ b/ads/opctl/operator/lowcode/forecast/model/ml_forecast.py @@ -168,8 +168,8 @@ def _train_model(self, data_train, data_test, model_kwargs): "error": str(e), "error_trace": traceback.format_exc(), } - logger.warn(f"Encountered Error: {e}. Skipping.") - logger.warn(traceback.format_exc()) + logger.warning(f"Encountered Error: {e}. Skipping.") + logger.warning(traceback.format_exc()) raise e def _build_model(self) -> pd.DataFrame: diff --git a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py index 5fd7fb644..5f33c72de 100644 --- a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py @@ -40,7 +40,7 @@ # "rmse": MeanSquaredError, # } # if selected_metric not in metric_translation.keys(): -# logger.warn( +# logger.warning( # f"Could not find the metric: {selected_metric} in torchmetrics. Defaulting to MAE and RMSE" # ) # return {"MAE": MeanAbsoluteError(), "RMSE": MeanSquaredError()} @@ -207,7 +207,7 @@ def _train_model(self, i, s_id, df, model_kwargs): "error": str(e), "error_trace": traceback.format_exc(), } - logger.warn(traceback.format_exc()) + logger.warning(traceback.format_exc()) raise e def _build_model(self) -> pd.DataFrame: @@ -363,7 +363,9 @@ def _generate_report(self): pd.Series( m.state_dict(), index=m.state_dict().keys(), - name=s_id if self.target_cat_col else self.original_target_column, + name=s_id + if self.target_cat_col + else self.original_target_column, ) ) all_model_states = pd.concat(model_states, axis=1) @@ -377,11 +379,15 @@ def _generate_report(self): self.explain_model() if not self.target_cat_col: - self.formatted_global_explanation = self.formatted_global_explanation.rename( - {"Series 1": self.original_target_column}, - axis=1, + self.formatted_global_explanation = ( + self.formatted_global_explanation.rename( + {"Series 1": self.original_target_column}, + axis=1, + ) + ) + self.formatted_local_explanation.drop( + "Series", axis=1, inplace=True ) - self.formatted_local_explanation.drop("Series", axis=1, inplace=True) # Create a markdown section for the global explainability global_explanation_section = rc.Block( @@ -412,7 +418,7 @@ def _generate_report(self): ] except Exception as e: # Do not fail the whole run due to explanations failure - logger.warn(f"Failed to generate Explanations with error: {e}.") + logger.warning(f"Failed to generate Explanations with error: {e}.") logger.debug(f"Full Traceback: {traceback.format_exc()}") model_description = rc.Text( diff --git a/ads/opctl/operator/lowcode/forecast/model/prophet.py b/ads/opctl/operator/lowcode/forecast/model/prophet.py index 5d4897432..1f9d62471 100644 --- a/ads/opctl/operator/lowcode/forecast/model/prophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/prophet.py @@ -44,12 +44,23 @@ def _fit_model(data, params, additional_regressors): from prophet import Prophet monthly_seasonality = params.pop("monthly_seasonality", False) + data_floor = params.pop("min", None) + data_cap = params.pop("max", None) + if data_cap or data_floor: + params["growth"] = "logistic" model = Prophet(**params) if monthly_seasonality: model.add_seasonality(name="monthly", period=30.5, fourier_order=5) params["monthly_seasonality"] = monthly_seasonality for add_reg in additional_regressors: model.add_regressor(add_reg) + if data_floor: + data["floor"] = float(data_floor) + params["floor"] = data_floor + if data_cap: + data["cap"] = float(data_cap) + params["cap"] = data_cap + model.fit(data) return model @@ -133,8 +144,8 @@ def _train_model(self, i, series_id, df, model_kwargs): "error": str(e), "error_trace": traceback.format_exc(), } - logger.warn(f"Encountered Error: {e}. Skipping.") - logger.warn(traceback.format_exc()) + logger.warning(f"Encountered Error: {e}. Skipping.") + logger.warning(traceback.format_exc()) def _build_model(self) -> pd.DataFrame: full_data_dict = self.datasets.get_data_by_series() @@ -149,9 +160,6 @@ def _build_model(self) -> pd.DataFrame: dt_column=self.spec.datetime_column.name, ) - # if os.environ["OCI__IS_SPARK"]: - # pass - # else: Parallel(n_jobs=-1, require="sharedmem")( delayed(ProphetOperatorModel._train_model)( self, i, series_id, df, model_kwargs.copy() @@ -222,7 +230,7 @@ def objective(trial): try: return np.mean(df_p[self.spec.metric]) except KeyError: - logger.warn( + logger.warning( f"Could not find the metric {self.spec.metric} within " f"the performance metrics: {df_p.columns}. Defaulting to `rmse`" ) @@ -274,7 +282,9 @@ def _generate_report(self): ) sec2 = _select_plot_list( - lambda s_id: self.models[s_id]["model"].plot_components(self.outputs[s_id]), + lambda s_id: self.models[s_id]["model"].plot_components( + self.outputs[s_id] + ), series_ids=series_ids, target_category_column=self.target_cat_col, ) @@ -283,11 +293,14 @@ def _generate_report(self): ) sec3_figs = { - s_id: self.models[s_id]["model"].plot(self.outputs[s_id]) for s_id in series_ids + s_id: self.models[s_id]["model"].plot(self.outputs[s_id]) + for s_id in series_ids } for s_id in series_ids: add_changepoints_to_plot( - sec3_figs[s_id].gca(), self.models[s_id]["model"], self.outputs[s_id] + sec3_figs[s_id].gca(), + self.models[s_id]["model"], + self.outputs[s_id], ) sec3 = _select_plot_list( lambda s_id: sec3_figs[s_id], @@ -378,7 +391,7 @@ def _generate_report(self): ] except Exception as e: # Do not fail the whole run due to explanations failure - logger.warn(f"Failed to generate Explanations with error: {e}.") + logger.warning(f"Failed to generate Explanations with error: {e}.") logger.debug(f"Full Traceback: {traceback.format_exc()}") model_description = rc.Text( diff --git a/ads/opctl/operator/lowcode/forecast/model_evaluator.py b/ads/opctl/operator/lowcode/forecast/model_evaluator.py index 41939e3ff..ad11f19bd 100644 --- a/ads/opctl/operator/lowcode/forecast/model_evaluator.py +++ b/ads/opctl/operator/lowcode/forecast/model_evaluator.py @@ -1,20 +1,21 @@ -# -*- coding: utf-8; -*- - # Copyright (c) 2023 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ +from pathlib import Path + import numpy as np import pandas as pd -from pathlib import Path from ads.opctl import logger from ads.opctl.operator.lowcode.common.const import DataColumns +from ads.opctl.operator.lowcode.common.errors import InsufficientDataError from ads.opctl.operator.lowcode.forecast.const import BACKTEST_REPORT_NAME +from ads.opctl.operator.lowcode.forecast.model.factory import SupportedModels + from .model.forecast_datasets import ForecastDatasets from .operator_config import ForecastOperatorConfig -from ads.opctl.operator.lowcode.forecast.model.factory import SupportedModels -from ads.opctl.operator.lowcode.common.errors import InsufficientDataError + class ModelEvaluator: """ @@ -23,6 +24,7 @@ class ModelEvaluator: This class is responsible for comparing different models or frameworks based on specified evaluation metrics and returning the best-performing option. """ + def __init__(self, models, k=5, subsample_ratio=0.20): """ Initializes the ModelEvaluator with a list of models, number of backtests and subsample ratio. @@ -40,23 +42,33 @@ def __init__(self, models, k=5, subsample_ratio=0.20): def generate_cutoffs(self, unique_dates, horizon): sorted_dates = np.sort(unique_dates) - train_window_size = [len(sorted_dates) - (i + 1) * horizon for i in range(self.k)] + train_window_size = [ + len(sorted_dates) - (i + 1) * horizon for i in range(self.k) + ] valid_train_window_size = [ws for ws in train_window_size if ws >= horizon * 2] if len(valid_train_window_size) < self.k: - logger.warn(f"Only {valid_train_window_size} backtests can be created") - cut_offs = sorted_dates[-horizon - 1:-horizon * (self.k + 1):-horizon][:len(valid_train_window_size)] + logger.warning(f"Only {valid_train_window_size} backtests can be created") + cut_offs = sorted_dates[-horizon - 1 : -horizon * (self.k + 1) : -horizon][ + : len(valid_train_window_size) + ] return cut_offs - def generate_k_fold_data(self, datasets: ForecastDatasets, operator_config: ForecastOperatorConfig): + def generate_k_fold_data( + self, datasets: ForecastDatasets, operator_config: ForecastOperatorConfig + ): date_col = operator_config.spec.datetime_column.name horizon = operator_config.spec.horizon historical_data = datasets.historical_data.data.reset_index() series_col = DataColumns.Series group_counts = historical_data[series_col].value_counts() - sample_count = max(self.minimum_sample_count, int(len(group_counts) * self.subsample_ratio)) + sample_count = max( + self.minimum_sample_count, int(len(group_counts) * self.subsample_ratio) + ) sampled_groups = group_counts.head(sample_count) - sampled_historical_data = historical_data[historical_data[series_col].isin(sampled_groups.index)] + sampled_historical_data = historical_data[ + historical_data[series_col].isin(sampled_groups.index) + ] min_group = group_counts.idxmin() min_series_data = historical_data[historical_data[series_col] == min_group] @@ -64,32 +76,62 @@ def generate_k_fold_data(self, datasets: ForecastDatasets, operator_config: Fore cut_offs = self.generate_cutoffs(unique_dates, horizon) if not len(cut_offs): - raise InsufficientDataError("Insufficient data to evaluate multiple models. Please specify a model " - "instead of using auto-select.") - training_datasets = [sampled_historical_data[sampled_historical_data[date_col] <= cut_off_date] for cut_off_date - in cut_offs] - test_datasets = [sampled_historical_data[sampled_historical_data[date_col] > cut_offs[0]]] + raise InsufficientDataError( + "Insufficient data to evaluate multiple models. Please specify a model " + "instead of using auto-select." + ) + training_datasets = [ + sampled_historical_data[sampled_historical_data[date_col] <= cut_off_date] + for cut_off_date in cut_offs + ] + test_datasets = [ + sampled_historical_data[sampled_historical_data[date_col] > cut_offs[0]] + ] for i, current in enumerate(cut_offs[1:]): - test_datasets.append(sampled_historical_data[(current < sampled_historical_data[date_col]) & ( - sampled_historical_data[date_col] <= cut_offs[i])]) + test_datasets.append( + sampled_historical_data[ + (current < sampled_historical_data[date_col]) + & (sampled_historical_data[date_col] <= cut_offs[i]) + ] + ) all_additional = datasets.additional_data.data.reset_index() - sampled_additional_data = all_additional[all_additional[series_col].isin(sampled_groups.index)] + sampled_additional_data = all_additional[ + all_additional[series_col].isin(sampled_groups.index) + ] max_historical_date = sampled_historical_data[date_col].max() - additional_data = [sampled_additional_data[sampled_additional_data[date_col] <= max_historical_date]] + additional_data = [ + sampled_additional_data[ + sampled_additional_data[date_col] <= max_historical_date + ] + ] for cut_off in cut_offs[:-1]: - trimmed_additional_data = sampled_additional_data[sampled_additional_data[date_col] <= cut_off] + trimmed_additional_data = sampled_additional_data[ + sampled_additional_data[date_col] <= cut_off + ] additional_data.append(trimmed_additional_data) return cut_offs, training_datasets, additional_data, test_datasets def remove_none_values(self, obj): if isinstance(obj, dict): - return {k: self.remove_none_values(v) for k, v in obj.items() if k is not None and v is not None} + return { + k: self.remove_none_values(v) + for k, v in obj.items() + if k is not None and v is not None + } else: return obj - def create_operator_config(self, operator_config, backtest, model, historical_data, additional_data, test_data): + def create_operator_config( + self, + operator_config, + backtest, + model, + historical_data, + additional_data, + test_data, + ): output_dir = operator_config.spec.output_directory.url - output_file_path = f'{output_dir}/back_testing/{model}/{backtest}' + output_file_path = f"{output_dir}/back_testing/{model}/{backtest}" Path(output_file_path).mkdir(parents=True, exist_ok=True) backtest_op_config_draft = operator_config.to_dict() backtest_spec = backtest_op_config_draft["spec"] @@ -99,62 +141,102 @@ def create_operator_config(self, operator_config, backtest, model, historical_da backtest_spec.pop("historical_data") backtest_spec["generate_report"] = False backtest_spec["model"] = model - backtest_spec['model_kwargs'] = None + backtest_spec["model_kwargs"] = None backtest_spec["output_directory"] = {"url": output_file_path} backtest_spec["target_category_columns"] = [DataColumns.Series] - backtest_spec['generate_explanations'] = False + backtest_spec["generate_explanations"] = False cleaned_config = self.remove_none_values(backtest_op_config_draft) - backtest_op_config = ForecastOperatorConfig.from_dict( - obj_dict=cleaned_config) + backtest_op_config = ForecastOperatorConfig.from_dict(obj_dict=cleaned_config) return backtest_op_config - def run_all_models(self, datasets: ForecastDatasets, operator_config: ForecastOperatorConfig): - cut_offs, train_sets, additional_data, test_sets = self.generate_k_fold_data(datasets, operator_config) + def run_all_models( + self, datasets: ForecastDatasets, operator_config: ForecastOperatorConfig + ): + cut_offs, train_sets, additional_data, test_sets = self.generate_k_fold_data( + datasets, operator_config + ) metrics = {} date_col = operator_config.spec.datetime_column.name for model in self.models: from .model.factory import ForecastOperatorModelFactory + metrics[model] = {} for i in range(len(cut_offs)): try: - backtest_historical_data = train_sets[i].set_index([date_col, DataColumns.Series]) - backtest_additional_data = additional_data[i].set_index([date_col, DataColumns.Series]) - backtest_test_data = test_sets[i].set_index([date_col, DataColumns.Series]) - backtest_operator_config = self.create_operator_config(operator_config, i, model, - backtest_historical_data, - backtest_additional_data, - backtest_test_data) - datasets = ForecastDatasets(backtest_operator_config, - backtest_historical_data, - backtest_additional_data, - backtest_test_data) + backtest_historical_data = train_sets[i].set_index( + [date_col, DataColumns.Series] + ) + backtest_additional_data = additional_data[i].set_index( + [date_col, DataColumns.Series] + ) + backtest_test_data = test_sets[i].set_index( + [date_col, DataColumns.Series] + ) + backtest_operator_config = self.create_operator_config( + operator_config, + i, + model, + backtest_historical_data, + backtest_additional_data, + backtest_test_data, + ) + datasets = ForecastDatasets( + backtest_operator_config, + backtest_historical_data, + backtest_additional_data, + backtest_test_data, + ) ForecastOperatorModelFactory.get_model( backtest_operator_config, datasets ).generate_report() - test_metrics_filename = backtest_operator_config.spec.test_metrics_filename + test_metrics_filename = ( + backtest_operator_config.spec.test_metrics_filename + ) metrics_df = pd.read_csv( - f"{backtest_operator_config.spec.output_directory.url}/{test_metrics_filename}") - metrics_df["average_across_series"] = metrics_df.drop('metrics', axis=1).mean(axis=1) - metrics_average_dict = dict(zip(metrics_df['metrics'].str.lower(), metrics_df['average_across_series'])) - metrics[model][i] = metrics_average_dict[operator_config.spec.metric] + f"{backtest_operator_config.spec.output_directory.url}/{test_metrics_filename}" + ) + metrics_df["average_across_series"] = metrics_df.drop( + "metrics", axis=1 + ).mean(axis=1) + metrics_average_dict = dict( + zip( + metrics_df["metrics"].str.lower(), + metrics_df["average_across_series"], + ) + ) + metrics[model][i] = metrics_average_dict[ + operator_config.spec.metric + ] except: - logger.warn(f"Failed to calculate metrics for {model} and {i} backtest") + logger.warning( + f"Failed to calculate metrics for {model} and {i} backtest" + ) return metrics - def find_best_model(self, datasets: ForecastDatasets, operator_config: ForecastOperatorConfig): + def find_best_model( + self, datasets: ForecastDatasets, operator_config: ForecastOperatorConfig + ): try: metrics = self.run_all_models(datasets, operator_config) except InsufficientDataError as e: model = SupportedModels.Prophet - logger.error(f"Running {model} model as auto-select failed with the following error: {e.message}") + logger.error( + f"Running {model} model as auto-select failed with the following error: {e.message}" + ) return model - nonempty_metrics = {model: metric for model, metric in metrics.items() if metric != {}} - avg_backtests_metric = {model: sum(value.values()) / len(value.values()) - for model, value in nonempty_metrics.items()} + nonempty_metrics = { + model: metric for model, metric in metrics.items() if metric != {} + } + avg_backtests_metric = { + model: sum(value.values()) / len(value.values()) + for model, value in nonempty_metrics.items() + } best_model = min(avg_backtests_metric, key=avg_backtests_metric.get) - logger.info(f"Among models {self.models}, {best_model} model shows better performance during backtesting.") - backtest_stats = pd.DataFrame(nonempty_metrics).rename_axis('backtest') + logger.info( + f"Among models {self.models}, {best_model} model shows better performance during backtesting." + ) + backtest_stats = pd.DataFrame(nonempty_metrics).rename_axis("backtest") backtest_stats["metric"] = operator_config.spec.metric backtest_stats.reset_index(inplace=True) output_dir = operator_config.spec.output_directory.url diff --git a/ads/opctl/operator/lowcode/forecast/whatifserve/deployment_manager.py b/ads/opctl/operator/lowcode/forecast/whatifserve/deployment_manager.py index 0e893a45b..4eb5afbd7 100644 --- a/ads/opctl/operator/lowcode/forecast/whatifserve/deployment_manager.py +++ b/ads/opctl/operator/lowcode/forecast/whatifserve/deployment_manager.py @@ -1,5 +1,4 @@ #!/usr/bin/env python -import json # Copyright (c) 2023, 2024 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ @@ -8,39 +7,58 @@ import shutil import sys import tempfile -import oci -import pandas as pd import cloudpickle +import oci +import pandas as pd +from oci.data_science import DataScienceClient, DataScienceClientCompositeOperations +from oci.data_science.models import ( + CategoryLogDetails, + CreateModelDeploymentDetails, + FixedSizeScalingPolicy, + InstanceConfiguration, + LogDetails, + ModelConfigurationDetails, + SingleModelDeploymentConfigurationDetails, +) -from ads.opctl import logger from ads.common.model_export_util import prepare_generic_model +from ads.common.object_storage_details import ObjectStorageDetails +from ads.opctl import logger from ads.opctl.operator.common.utils import create_log_in_log_group -from ads.opctl.operator.lowcode.common.utils import write_data, write_simple_json -from ads.opctl.operator.lowcode.common.utils import default_signer +from ads.opctl.operator.lowcode.common.utils import ( + default_signer, + write_data, + write_simple_json, +) + from ..model.forecast_datasets import AdditionalData from ..operator_config import ForecastOperatorSpec -from oci.data_science import DataScienceClient, DataScienceClientCompositeOperations - -from oci.data_science.models import ModelConfigurationDetails, InstanceConfiguration, \ - FixedSizeScalingPolicy, CategoryLogDetails, LogDetails, \ - SingleModelDeploymentConfigurationDetails, CreateModelDeploymentDetails -from ads.common.object_storage_details import ObjectStorageDetails - class ModelDeploymentManager: - def __init__(self, spec: ForecastOperatorSpec, additional_data: AdditionalData, previous_model_version=None): + def __init__( + self, + spec: ForecastOperatorSpec, + additional_data: AdditionalData, + previous_model_version=None, + ): self.spec = spec self.model_name = spec.model self.horizon = spec.horizon self.additional_data = additional_data.get_dict_by_series() self.model_obj = {} self.display_name = spec.what_if_analysis.model_display_name - self.project_id = spec.what_if_analysis.project_id if spec.what_if_analysis.project_id \ - else os.environ.get('PROJECT_OCID') - self.compartment_id = spec.what_if_analysis.compartment_id if spec.what_if_analysis.compartment_id \ - else os.environ.get('NB_SESSION_COMPARTMENT_OCID') + self.project_id = ( + spec.what_if_analysis.project_id + if spec.what_if_analysis.project_id + else os.environ.get("PROJECT_OCID") + ) + self.compartment_id = ( + spec.what_if_analysis.compartment_id + if spec.what_if_analysis.compartment_id + else os.environ.get("NB_SESSION_COMPARTMENT_OCID") + ) if self.project_id is None or self.compartment_id is None: raise ValueError("Either project_id or compartment_id cannot be None.") self.path_to_artifact = f"{self.spec.output_directory.url}/artifacts/" @@ -58,17 +76,23 @@ def _sanity_test(self): try: sys.path.insert(0, f"{self.path_to_artifact}") from score import load_model, predict + _ = load_model() # Write additional data to tmp file and perform sanity check - with tempfile.NamedTemporaryFile(suffix='.csv') as temp_file: + with tempfile.NamedTemporaryFile(suffix=".csv") as temp_file: one_series = next(iter(self.additional_data)) - sample_prediction_data = self.additional_data[one_series].tail(self.horizon) - sample_prediction_data[self.spec.target_category_columns[0]] = one_series + sample_prediction_data = self.additional_data[one_series].tail( + self.horizon + ) + sample_prediction_data[self.spec.target_category_columns[0]] = ( + one_series + ) date_col_name = self.spec.datetime_column.name date_col_format = self.spec.datetime_column.format - sample_prediction_data[date_col_name] = sample_prediction_data[date_col_name].dt.strftime( - date_col_format) + sample_prediction_data[date_col_name] = sample_prediction_data[ + date_col_name + ].dt.strftime(date_col_format) sample_prediction_data.to_csv(temp_file.name, index=False) input_data = {"additional_data": {"url": temp_file.name}} prediction_test = predict(input_data, _) @@ -86,16 +110,18 @@ def _copy_score_file(self): try: current_dir = os.path.dirname(os.path.abspath(__file__)) score_file = os.path.join(current_dir, "score.py") - destination_file = os.path.join(self.path_to_artifact, os.path.basename(score_file)) + destination_file = os.path.join( + self.path_to_artifact, os.path.basename(score_file) + ) shutil.copy2(score_file, destination_file) logger.info(f"score.py copied successfully to {self.path_to_artifact}") except Exception as e: - logger.warn(f"Error copying file: {e}") + logger.warning(f"Error copying file: {e}") raise e def save_to_catalog(self): """Save the model to a model catalog""" - with open(self.pickle_file_path, 'rb') as file: + with open(self.pickle_file_path, "rb") as file: self.model_obj = pickle.load(file) if not os.path.exists(self.path_to_artifact): @@ -108,7 +134,8 @@ def save_to_catalog(self): self.path_to_artifact, function_artifacts=False, force_overwrite=True, - data_science_env=True) + data_science_env=True, + ) self._copy_score_file() self._sanity_test() @@ -124,11 +151,14 @@ def save_to_catalog(self): display_name=self.display_name, compartment_id=self.compartment_id, project_id=self.project_id, - description=description) + description=description, + ) self.catalog_id = catalog_entry.id - logger.info(f"Saved {self.model_name} version-v{self.model_version} to model catalog" - f" with model ocid : {self.catalog_id}") + logger.info( + f"Saved {self.model_name} version-v{self.model_version} to model catalog" + f" with model ocid : {self.catalog_id}" + ) self.deployment_info = {"model_ocid": self.catalog_id, "series": list(series)} @@ -154,19 +184,25 @@ def create_deployment(self): metric_type=auto_scaling_config.scaling_metric, scale_in_configuration=oci.data_science.models.PredefinedExpressionThresholdScalingConfiguration( scaling_configuration_type="THRESHOLD", - threshold=auto_scaling_config.scale_in_threshold + threshold=auto_scaling_config.scale_in_threshold, ), scale_out_configuration=oci.data_science.models.PredefinedExpressionThresholdScalingConfiguration( scaling_configuration_type="THRESHOLD", - threshold=auto_scaling_config.scale_out_threshold - ) - )], + threshold=auto_scaling_config.scale_out_threshold, + ), + ) + ], maximum_instance_count=auto_scaling_config.maximum_instance, minimum_instance_count=auto_scaling_config.minimum_instance, - initial_instance_count=auto_scaling_config.minimum_instance)], + initial_instance_count=auto_scaling_config.minimum_instance, + ) + ], cool_down_in_seconds=auto_scaling_config.cool_down_in_seconds, - is_enabled=True) - logger.info(f"Using autoscaling {auto_scaling_config.scaling_metric} for creating MD") + is_enabled=True, + ) + logger.info( + f"Using autoscaling {auto_scaling_config.scaling_metric} for creating MD" + ) else: scaling_policy = FixedSizeScalingPolicy(instance_count=1) logger.info("Using fixed size policy for creating MD") @@ -174,13 +210,15 @@ def create_deployment(self): model_configuration_details_object = ModelConfigurationDetails( model_id=self.catalog_id, instance_configuration=InstanceConfiguration( - instance_shape_name=initial_shape), + instance_shape_name=initial_shape + ), scaling_policy=scaling_policy, - bandwidth_mbps=20) + bandwidth_mbps=20, + ) single_model_config = SingleModelDeploymentConfigurationDetails( - deployment_type='SINGLE_MODEL', - model_configuration_details=model_configuration_details_object + deployment_type="SINGLE_MODEL", + model_configuration_details=model_configuration_details_object, ) log_group = self.spec.what_if_analysis.model_deployment.log_group @@ -191,10 +229,9 @@ def create_deployment(self): log_id = create_log_in_log_group(self.compartment_id, log_group, auth) logs_configuration_details_object = CategoryLogDetails( - access=LogDetails(log_group_id=log_group, - log_id=log_id), - predict=LogDetails(log_group_id=log_group, - log_id=log_id)) + access=LogDetails(log_group_id=log_group, log_id=log_id), + predict=LogDetails(log_group_id=log_group, log_id=log_id), + ) model_deploy_configuration = CreateModelDeploymentDetails( display_name=name, @@ -202,24 +239,30 @@ def create_deployment(self): project_id=self.project_id, compartment_id=self.compartment_id, model_deployment_configuration_details=single_model_config, - category_log_details=logs_configuration_details_object) + category_log_details=logs_configuration_details_object, + ) if not self.test_mode: auth = oci.auth.signers.get_resource_principals_signer() data_science = DataScienceClient({}, signer=auth) data_science_composite = DataScienceClientCompositeOperations(data_science) - model_deployment = data_science_composite.create_model_deployment_and_wait_for_state( - model_deploy_configuration, - wait_for_states=[ - "SUCCEEDED", "FAILED"]) - self.deployment_info['work_request'] = model_deployment.data.id + model_deployment = ( + data_science_composite.create_model_deployment_and_wait_for_state( + model_deploy_configuration, wait_for_states=["SUCCEEDED", "FAILED"] + ) + ) + self.deployment_info["work_request"] = model_deployment.data.id logger.info(f"deployment metadata :{model_deployment.data}") - md = data_science.get_model_deployment(model_deployment_id=model_deployment.data.resources[0].identifier) - self.deployment_info['model_deployment_ocid'] = md.data.id - self.deployment_info['status'] = md.data.lifecycle_state + md = data_science.get_model_deployment( + model_deployment_id=model_deployment.data.resources[0].identifier + ) + self.deployment_info["model_deployment_ocid"] = md.data.id + self.deployment_info["status"] = md.data.lifecycle_state endpoint_url = md.data.model_deployment_url - self.deployment_info['model_deployment_endpoint'] = f"{endpoint_url}/predict" - self.deployment_info['log_id'] = log_id + self.deployment_info["model_deployment_endpoint"] = ( + f"{endpoint_url}/predict" + ) + self.deployment_info["log_id"] = log_id def save_deployment_info(self): output_dir = self.spec.output_directory.url @@ -234,7 +277,9 @@ def save_deployment_info(self): storage_options=storage_options, index=False, indent=4, - orient="records" + orient="records", + ) + write_simple_json( + self.deployment_info, os.path.join(output_dir, "deployment_info.json") ) - write_simple_json(self.deployment_info, os.path.join(output_dir, "deployment_info.json")) logger.info(f"Saved deployment info to {output_dir}") diff --git a/docs/source/user_guide/operators/forecast_operator/index.rst b/docs/source/user_guide/operators/forecast_operator/index.rst index 60993863f..bd0dd03d9 100644 --- a/docs/source/user_guide/operators/forecast_operator/index.rst +++ b/docs/source/user_guide/operators/forecast_operator/index.rst @@ -105,6 +105,29 @@ The metric can be optionally specified in the YAML file: target_column: y metric: rmse +Setting a Minimum or Maximum Value +---------------------------------- + +You can set a minimum or maximum output value by setting ``min`` or ``max`` in the ``model_kwargs`` section of the yaml file + +.. code-block:: yaml + + kind: operator + type: forecast + version: v1 + spec: + datetime_column: + name: ds + historical_data: + url: https://raw.githubusercontent.com/facebook/prophet/main/examples/example_yosemite_temps.csv + horizon: 3 + model: prophet + target_column: y + model_kwargs: + min: 0 + max: 100 + + Explanations ------------ diff --git a/tests/operators/forecast/test_errors.py b/tests/operators/forecast/test_errors.py index 02907c404..62f1cd132 100644 --- a/tests/operators/forecast/test_errors.py +++ b/tests/operators/forecast/test_errors.py @@ -739,7 +739,7 @@ def test_pandas_historical_input(operator_setup, model): ) tmpdirname = operator_setup - historical_data_path, additional_data_path = setup_small_rossman() + historical_data_path, additional_data_path = setup_artificial_data() yaml_i, output_data_path = populate_yaml( tmpdirname=tmpdirname, historical_data_path=historical_data_path, @@ -835,9 +835,9 @@ def test_what_if_analysis(operator_setup, model): historical_data = pd.read_csv(historical_data_path, parse_dates=["Date"]) historical_filtered = historical_data[historical_data["Date"] > "2013-03-01"] additional_data = pd.read_csv(additional_data_path, parse_dates=["Date"]) - add_filtered = additional_data[additional_data['Date'] > "2013-03-01"] - add_filtered.to_csv(f'{additional_test_path}', index=False) - historical_filtered.to_csv(f'{historical_test_path}', index=False) + add_filtered = additional_data[additional_data["Date"] > "2013-03-01"] + add_filtered.to_csv(f"{additional_test_path}", index=False) + historical_filtered.to_csv(f"{historical_test_path}", index=False) yaml_i, output_data_path = populate_yaml( tmpdirname=tmpdirname, @@ -893,5 +893,51 @@ def test_auto_select(operator_setup): report_path = f"{output_data_path}/report.html" assert os.path.exists(report_path), f"Report file not found at {report_path}" + +@pytest.mark.parametrize("model", ["prophet"]) +def test_prophet_floor_cap(operator_setup, model): + from ads.opctl.operator.lowcode.forecast.__main__ import operate + from ads.opctl.operator.lowcode.forecast.operator_config import ( + ForecastOperatorConfig, + ) + + yaml_i = TEMPLATE_YAML.copy() + yaml_i["spec"]["horizon"] = 10 + yaml_i["spec"]["model"] = model + yaml_i["spec"]["historical_data"] = {"format": "pandas"} + yaml_i["spec"]["target_column"] = "target" + yaml_i["spec"]["datetime_column"]["name"] = HISTORICAL_DATETIME_COL.name + yaml_i["spec"]["output_directory"]["url"] = operator_setup + yaml_i["spec"]["model_kwargs"] = {"max": 20, "min": 0} + + target_column = pd.Series(np.arange(20, -6, -2), name="target") + df = pd.concat( + [HISTORICAL_DATETIME_COL[: len(target_column)], target_column], axis=1 + ) + yaml_i["spec"]["historical_data"]["data"] = df + operator_config = ForecastOperatorConfig.from_dict(yaml_i) + results = operate(operator_config) + assert np.all( + results.get_forecast()["forecast_value"].dropna() > 0 + ), "`min` not obeyed in prophet" + assert np.all( + results.get_forecast()["fitted_value"].dropna() > 0 + ), "`min` not obeyed in prophet" + + target_column = pd.Series(np.arange(-6, 20, 2), name="target") + df = pd.concat( + [HISTORICAL_DATETIME_COL[: len(target_column)], target_column], axis=1 + ) + yaml_i["spec"]["historical_data"]["data"] = df + operator_config = ForecastOperatorConfig.from_dict(yaml_i) + results = operate(operator_config) + assert np.all( + results.get_forecast()["forecast_value"].dropna() < 20 + ), "`max` not obeyed in prophet" + assert np.all( + results.get_forecast()["fitted_value"].dropna() < 20 + ), "`max` not obeyed in prophet" + + if __name__ == "__main__": pass From 71f51ceb236c988fb0e8dbc92c20ffc059f34a20 Mon Sep 17 00:00:00 2001 From: Allen Date: Fri, 14 Mar 2025 21:39:39 +0000 Subject: [PATCH 03/18] enable report title --- .../lowcode/forecast/model/base_model.py | 16 ++++----- tests/operators/forecast/test_errors.py | 34 +++++++++++++++++-- 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/base_model.py b/ads/opctl/operator/lowcode/forecast/model/base_model.py index 6238bc5f5..4047b4fc9 100644 --- a/ads/opctl/operator/lowcode/forecast/model/base_model.py +++ b/ads/opctl/operator/lowcode/forecast/model/base_model.py @@ -29,7 +29,6 @@ seconds_to_datetime, write_data, ) -from ads.opctl.operator.lowcode.forecast.model.forecast_datasets import TestData from ads.opctl.operator.lowcode.forecast.utils import ( _build_metrics_df, _build_metrics_per_horizon, @@ -132,11 +131,10 @@ def generate_report(self): if self.datasets.test_data is not None: try: - ( - self.test_eval_metrics, - summary_metrics - ) = self._test_evaluate_metrics( - elapsed_time=elapsed_time, + (self.test_eval_metrics, summary_metrics) = ( + self._test_evaluate_metrics( + elapsed_time=elapsed_time, + ) ) if not self.target_cat_col: self.test_eval_metrics.rename( @@ -155,9 +153,9 @@ def generate_report(self): model_description, other_sections, ) = self._generate_report() - + report_title = self.config.spec.report_title or "Forecast Report" header_section = rc.Block( - rc.Heading("Forecast Report", level=1), + rc.Heading(report_title, level=1), rc.Text( f"You selected the {self.spec.model} model.\nBased on your dataset, you could have also selected any of the models: {SupportedModels.keys()}." ), @@ -471,7 +469,7 @@ def _save_report( result_df: pd.DataFrame, metrics_df: pd.DataFrame, test_metrics_df: pd.DataFrame, - test_data: pd.DataFrame, + # test_data: pd.DataFrame, ): """Saves resulting reports to the given folder.""" diff --git a/tests/operators/forecast/test_errors.py b/tests/operators/forecast/test_errors.py index 02907c404..f35abff50 100644 --- a/tests/operators/forecast/test_errors.py +++ b/tests/operators/forecast/test_errors.py @@ -835,9 +835,9 @@ def test_what_if_analysis(operator_setup, model): historical_data = pd.read_csv(historical_data_path, parse_dates=["Date"]) historical_filtered = historical_data[historical_data["Date"] > "2013-03-01"] additional_data = pd.read_csv(additional_data_path, parse_dates=["Date"]) - add_filtered = additional_data[additional_data['Date'] > "2013-03-01"] - add_filtered.to_csv(f'{additional_test_path}', index=False) - historical_filtered.to_csv(f'{historical_test_path}', index=False) + add_filtered = additional_data[additional_data["Date"] > "2013-03-01"] + add_filtered.to_csv(f"{additional_test_path}", index=False) + historical_filtered.to_csv(f"{historical_test_path}", index=False) yaml_i, output_data_path = populate_yaml( tmpdirname=tmpdirname, @@ -893,5 +893,33 @@ def test_auto_select(operator_setup): report_path = f"{output_data_path}/report.html" assert os.path.exists(report_path), f"Report file not found at {report_path}" + +@pytest.mark.parametrize("model", ["prophet"]) +def test_report_title(operator_setup, model): + from ads.opctl.operator.lowcode.forecast.__main__ import operate + from ads.opctl.operator.lowcode.forecast.operator_config import ( + ForecastOperatorConfig, + ) + + yaml_i = TEMPLATE_YAML.copy() + yaml_i["spec"]["horizon"] = 10 + yaml_i["spec"]["model"] = model + yaml_i["spec"]["historical_data"] = {"format": "pandas"} + yaml_i["spec"]["target_column"] = TARGET_COL.name + yaml_i["spec"]["datetime_column"]["name"] = HISTORICAL_DATETIME_COL.name + yaml_i["spec"]["report_title"] = "Skibidi ADS Skibidi" + yaml_i["spec"]["output_directory"]["url"] = operator_setup + + df = pd.concat([HISTORICAL_DATETIME_COL[:15], TARGET_COL[:15]], axis=1) + yaml_i["spec"]["historical_data"]["data"] = df + operator_config = ForecastOperatorConfig.from_dict(yaml_i) + results = operate(operator_config) + with open(os.path.join(operator_setup, "report.html")) as f: + for line in f: + if "Skibidi ADS Skibidi" in line: + return True + assert False, "Report Title was not set" + + if __name__ == "__main__": pass From 6cb17a879f54ac1f16677e5e233b9b6d1c3ced2a Mon Sep 17 00:00:00 2001 From: Allen Date: Fri, 14 Mar 2025 22:18:00 +0000 Subject: [PATCH 04/18] add options to disable file generation --- .../lowcode/forecast/model/base_model.py | 100 ++++++++++-------- .../lowcode/forecast/operator_config.py | 31 +++++- .../forecast_operator/development.rst | 29 +++++ tests/operators/forecast/test_errors.py | 35 ++++++ 4 files changed, 146 insertions(+), 49 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/base_model.py b/ads/opctl/operator/lowcode/forecast/model/base_model.py index 4047b4fc9..dae92ec85 100644 --- a/ads/opctl/operator/lowcode/forecast/model/base_model.py +++ b/ads/opctl/operator/lowcode/forecast/model/base_model.py @@ -469,17 +469,18 @@ def _save_report( result_df: pd.DataFrame, metrics_df: pd.DataFrame, test_metrics_df: pd.DataFrame, - # test_data: pd.DataFrame, + test_data: pd.DataFrame, ): """Saves resulting reports to the given folder.""" unique_output_dir = self.spec.output_directory.url results = ForecastResults() - if ObjectStorageDetails.is_oci_path(unique_output_dir): - storage_options = default_signer() - else: - storage_options = {} + storage_options = ( + default_signer() + if ObjectStorageDetails.is_oci_path(unique_output_dir) + else {} + ) # report-creator html report if self.spec.generate_report: @@ -510,12 +511,13 @@ def _save_report( if self.target_cat_col else result_df.drop(DataColumns.Series, axis=1) ) - write_data( - data=result_df, - filename=os.path.join(unique_output_dir, self.spec.forecast_filename), - format="csv", - storage_options=storage_options, - ) + if self.spec.generate_forecast_file: + write_data( + data=result_df, + filename=os.path.join(unique_output_dir, self.spec.forecast_filename), + format="csv", + storage_options=storage_options, + ) results.set_forecast(result_df) # metrics csv report @@ -529,15 +531,16 @@ def _save_report( metrics_df_formatted = metrics_df.reset_index().rename( {"index": "metrics", "Series 1": metrics_col_name}, axis=1 ) - write_data( - data=metrics_df_formatted, - filename=os.path.join( - unique_output_dir, self.spec.metrics_filename - ), - format="csv", - storage_options=storage_options, - index=False, - ) + if self.spec.generate_metrics_file: + write_data( + data=metrics_df_formatted, + filename=os.path.join( + unique_output_dir, self.spec.metrics_filename + ), + format="csv", + storage_options=storage_options, + index=False, + ) results.set_metrics(metrics_df_formatted) else: logger.warn( @@ -550,15 +553,16 @@ def _save_report( test_metrics_df_formatted = test_metrics_df.reset_index().rename( {"index": "metrics", "Series 1": metrics_col_name}, axis=1 ) - write_data( - data=test_metrics_df_formatted, - filename=os.path.join( - unique_output_dir, self.spec.test_metrics_filename - ), - format="csv", - storage_options=storage_options, - index=False, - ) + if self.spec.generate_metrics_file: + write_data( + data=test_metrics_df_formatted, + filename=os.path.join( + unique_output_dir, self.spec.test_metrics_filename + ), + format="csv", + storage_options=storage_options, + index=False, + ) results.set_test_metrics(test_metrics_df_formatted) else: logger.warn( @@ -568,15 +572,16 @@ def _save_report( if self.spec.generate_explanations: try: if not self.formatted_global_explanation.empty: - write_data( - data=self.formatted_global_explanation, - filename=os.path.join( - unique_output_dir, self.spec.global_explanation_filename - ), - format="csv", - storage_options=storage_options, - index=True, - ) + if not self.spec.generate_explanations_file: + write_data( + data=self.formatted_global_explanation, + filename=os.path.join( + unique_output_dir, self.spec.global_explanation_filename + ), + format="csv", + storage_options=storage_options, + index=True, + ) results.set_global_explanations(self.formatted_global_explanation) else: logger.warn( @@ -584,15 +589,16 @@ def _save_report( ) if not self.formatted_local_explanation.empty: - write_data( - data=self.formatted_local_explanation, - filename=os.path.join( - unique_output_dir, self.spec.local_explanation_filename - ), - format="csv", - storage_options=storage_options, - index=True, - ) + if not self.spec.generate_explanations_file: + write_data( + data=self.formatted_local_explanation, + filename=os.path.join( + unique_output_dir, self.spec.local_explanation_filename + ), + format="csv", + storage_options=storage_options, + index=True, + ) results.set_local_explanations(self.formatted_local_explanation) else: logger.warn( diff --git a/ads/opctl/operator/lowcode/forecast/operator_config.py b/ads/opctl/operator/lowcode/forecast/operator_config.py index 40bb1dc11..2c74eb8af 100644 --- a/ads/opctl/operator/lowcode/forecast/operator_config.py +++ b/ads/opctl/operator/lowcode/forecast/operator_config.py @@ -18,9 +18,11 @@ from .const import SpeedAccuracyMode, SupportedMetrics, SupportedModels + @dataclass class AutoScaling(DataClassSerializable): """Class representing simple autoscaling policy""" + minimum_instance: int = 1 maximum_instance: int = None cool_down_in_seconds: int = 600 @@ -28,9 +30,11 @@ class AutoScaling(DataClassSerializable): scale_out_threshold: int = 80 scaling_metric: str = "CPU_UTILIZATION" + @dataclass(repr=True) class ModelDeploymentServer(DataClassSerializable): """Class representing model deployment server specification for whatif-analysis.""" + display_name: str = None initial_shape: str = None description: str = None @@ -42,10 +46,13 @@ class ModelDeploymentServer(DataClassSerializable): @dataclass(repr=True) class WhatIfAnalysis(DataClassSerializable): """Class representing operator specification for whatif-analysis.""" + model_display_name: str = None compartment_id: str = None project_id: str = None - model_deployment: ModelDeploymentServer = field(default_factory=ModelDeploymentServer) + model_deployment: ModelDeploymentServer = field( + default_factory=ModelDeploymentServer + ) @dataclass(repr=True) @@ -106,8 +113,11 @@ class ForecastOperatorSpec(DataClassSerializable): datetime_column: DateTimeColumn = field(default_factory=DateTimeColumn) target_category_columns: List[str] = field(default_factory=list) generate_report: bool = None + generate_forecast_file: bool = None generate_metrics: bool = None + generate_metrics_file: bool = None generate_explanations: bool = None + generate_explanations_file: bool = None explanations_accuracy_mode: str = None horizon: int = None model: str = None @@ -126,7 +136,9 @@ def __post_init__(self): self.output_directory = self.output_directory or OutputDirectory( url=find_output_dirname(self.output_directory) ) - self.generate_model_pickle = True if self.generate_model_pickle or self.what_if_analysis else False + self.generate_model_pickle = ( + True if self.generate_model_pickle or self.what_if_analysis else False + ) self.metric = (self.metric or "").lower() or SupportedMetrics.SMAPE.lower() self.model = self.model or SupportedModels.Prophet self.confidence_interval_width = self.confidence_interval_width or 0.80 @@ -144,6 +156,21 @@ def __post_init__(self): self.generate_metrics = ( self.generate_metrics if self.generate_metrics is not None else True ) + self.generate_metrics_file = ( + self.generate_metrics_file + if self.generate_metrics_file is not None + else True + ) + self.generate_forecast_file = ( + self.generate_forecast_file + if self.generate_forecast_file is not None + else True + ) + self.generate_explanations_file = ( + self.generate_explanations_file + if self.generate_explanations_file is not None + else True + ) # For Explanations Generation. When user doesn't specify defaults to False self.generate_explanations = ( self.generate_explanations diff --git a/docs/source/user_guide/operators/forecast_operator/development.rst b/docs/source/user_guide/operators/forecast_operator/development.rst index aec94d081..e40e02777 100644 --- a/docs/source/user_guide/operators/forecast_operator/development.rst +++ b/docs/source/user_guide/operators/forecast_operator/development.rst @@ -125,6 +125,35 @@ Before running operators on a job, users must configure their output directory. horizon: 3 target_column: y + +Exclude Writing Certain Output Files +==================================== + +You can choose to exclude certain files from being written to the output folder. This may be because you are calling the API, and not using the output folder. The yaml options below are ``True`` by default, but can be set to ``False`` to prevent file generation. + +.. code-block:: yaml + + kind: operator + type: forecast + version: v1 + spec: + datetime_column: + name: ds + historical_data: + url: oci://@/example_yosemite_temps.csv + output_directory: + url: oci://@/my_results/ + horizon: 3 + target_column: y + generate_report: True + generate_forecast_file: False + generate_metrics_file: False + generate_explanations: True + generate_explanations_file: False + +The above example will save a report.html to ``oci://@/my_results/``, but it will NOT save other files. + + Ingesting and Interpreting Outputs ================================== diff --git a/tests/operators/forecast/test_errors.py b/tests/operators/forecast/test_errors.py index f35abff50..66750d6e6 100644 --- a/tests/operators/forecast/test_errors.py +++ b/tests/operators/forecast/test_errors.py @@ -921,5 +921,40 @@ def test_report_title(operator_setup, model): assert False, "Report Title was not set" +@pytest.mark.parametrize("model", ["prophet"]) +def test_generate_files(operator_setup, model): + from ads.opctl.operator.lowcode.forecast.__main__ import operate + from ads.opctl.operator.lowcode.forecast.operator_config import ( + ForecastOperatorConfig, + ) + + yaml_i = TEMPLATE_YAML.copy() + yaml_i["spec"]["horizon"] = 10 + yaml_i["spec"]["model"] = model + yaml_i["spec"]["historical_data"] = {"format": "pandas"} + yaml_i["spec"]["target_column"] = TARGET_COL.name + yaml_i["spec"]["datetime_column"]["name"] = HISTORICAL_DATETIME_COL.name + yaml_i["spec"]["report_title"] = "Skibidi ADS Skibidi" + yaml_i["spec"]["output_directory"]["url"] = operator_setup + yaml_i["spec"]["generate_explanations_file"] = False + yaml_i["spec"]["generate_forecast_file"] = False + yaml_i["spec"]["generate_metrics_file"] = False + + df = pd.concat([HISTORICAL_DATETIME_COL[:15], TARGET_COL[:15]], axis=1) + yaml_i["spec"]["historical_data"]["data"] = df + operator_config = ForecastOperatorConfig.from_dict(yaml_i) + results = operate(operator_config) + files = os.listdir(operator_setup) + assert "report.html" in files, "Failed to generate report" + assert ( + "forecast.csv" not in files + ), "Generated forecast file, but `generate_forecast_file` was set False" + assert ( + "metrics.csv" not in files + ), "Generated metrics file, but `generate_metrics_file` was set False" + assert not results.get_forecast().empty + assert not results.get_metrics().empty + + if __name__ == "__main__": pass From fba3318a62814fa53562619543a9da6988f36686 Mon Sep 17 00:00:00 2001 From: Allen Date: Fri, 14 Mar 2025 22:26:54 +0000 Subject: [PATCH 05/18] typo in tests --- tests/operators/forecast/test_errors.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/operators/forecast/test_errors.py b/tests/operators/forecast/test_errors.py index 62f1cd132..9104ba7a5 100644 --- a/tests/operators/forecast/test_errors.py +++ b/tests/operators/forecast/test_errors.py @@ -738,10 +738,11 @@ def test_pandas_historical_input(operator_setup, model): ForecastOperatorConfig, ) - tmpdirname = operator_setup - historical_data_path, additional_data_path = setup_artificial_data() + historical_data_path, additional_data_path, _ = setup_artificial_data( + operator_setup + ) yaml_i, output_data_path = populate_yaml( - tmpdirname=tmpdirname, + operator_setup=operator_setup, historical_data_path=historical_data_path, additional_data_path=additional_data_path, ) @@ -755,7 +756,7 @@ def test_pandas_historical_input(operator_setup, model): operator_config = ForecastOperatorConfig.from_dict(yaml_i) operate(operator_config) assert pd.read_csv(additional_data_path)["Date"].equals( - pd.read_csv(f"{tmpdirname}/results/forecast.csv")["Date"] + pd.read_csv(f"{operator_setup}/results/forecast.csv")["Date"] ) From 59df305cb70e67feb9fca5353ab1db1b84c6653f Mon Sep 17 00:00:00 2001 From: Allen Date: Sat, 15 Mar 2025 09:26:18 +0000 Subject: [PATCH 06/18] typo --- tests/operators/forecast/test_errors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/operators/forecast/test_errors.py b/tests/operators/forecast/test_errors.py index 9104ba7a5..b9fed206c 100644 --- a/tests/operators/forecast/test_errors.py +++ b/tests/operators/forecast/test_errors.py @@ -742,7 +742,7 @@ def test_pandas_historical_input(operator_setup, model): operator_setup ) yaml_i, output_data_path = populate_yaml( - operator_setup=operator_setup, + tmpdirname=operator_setup, historical_data_path=historical_data_path, additional_data_path=additional_data_path, ) From 32e748e14a6f4ef502f052e6230edacb68442bb9 Mon Sep 17 00:00:00 2001 From: Allen Date: Sat, 15 Mar 2025 09:52:44 +0000 Subject: [PATCH 07/18] improve unit test --- .../lowcode/forecast/model/base_model.py | 7 ++--- .../lowcode/forecast/operator_config.py | 15 +++++---- tests/operators/forecast/test_errors.py | 31 +++++++++++++++++-- 3 files changed, 38 insertions(+), 15 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/base_model.py b/ads/opctl/operator/lowcode/forecast/model/base_model.py index dae92ec85..8dc1653eb 100644 --- a/ads/opctl/operator/lowcode/forecast/model/base_model.py +++ b/ads/opctl/operator/lowcode/forecast/model/base_model.py @@ -153,9 +153,8 @@ def generate_report(self): model_description, other_sections, ) = self._generate_report() - report_title = self.config.spec.report_title or "Forecast Report" header_section = rc.Block( - rc.Heading(report_title, level=1), + rc.Heading(self.spec.report_title, level=1), rc.Text( f"You selected the {self.spec.model} model.\nBased on your dataset, you could have also selected any of the models: {SupportedModels.keys()}." ), @@ -572,7 +571,7 @@ def _save_report( if self.spec.generate_explanations: try: if not self.formatted_global_explanation.empty: - if not self.spec.generate_explanations_file: + if self.spec.generate_explanation_files: write_data( data=self.formatted_global_explanation, filename=os.path.join( @@ -589,7 +588,7 @@ def _save_report( ) if not self.formatted_local_explanation.empty: - if not self.spec.generate_explanations_file: + if self.spec.generate_explanation_files: write_data( data=self.formatted_local_explanation, filename=os.path.join( diff --git a/ads/opctl/operator/lowcode/forecast/operator_config.py b/ads/opctl/operator/lowcode/forecast/operator_config.py index 2c74eb8af..23ec5b959 100644 --- a/ads/opctl/operator/lowcode/forecast/operator_config.py +++ b/ads/opctl/operator/lowcode/forecast/operator_config.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright (c) 2023, 2024 Oracle and/or its affiliates. +# Copyright (c) 2023, 2025 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ import os @@ -117,7 +117,7 @@ class ForecastOperatorSpec(DataClassSerializable): generate_metrics: bool = None generate_metrics_file: bool = None generate_explanations: bool = None - generate_explanations_file: bool = None + generate_explanation_files: bool = None explanations_accuracy_mode: str = None horizon: int = None model: str = None @@ -136,9 +136,7 @@ def __post_init__(self): self.output_directory = self.output_directory or OutputDirectory( url=find_output_dirname(self.output_directory) ) - self.generate_model_pickle = ( - True if self.generate_model_pickle or self.what_if_analysis else False - ) + self.generate_model_pickle = self.generate_model_pickle or self.what_if_analysis self.metric = (self.metric or "").lower() or SupportedMetrics.SMAPE.lower() self.model = self.model or SupportedModels.Prophet self.confidence_interval_width = self.confidence_interval_width or 0.80 @@ -166,9 +164,9 @@ def __post_init__(self): if self.generate_forecast_file is not None else True ) - self.generate_explanations_file = ( - self.generate_explanations_file - if self.generate_explanations_file is not None + self.generate_explanation_files = ( + self.generate_explanation_files + if self.generate_explanation_files is not None else True ) # For Explanations Generation. When user doesn't specify defaults to False @@ -191,6 +189,7 @@ def __post_init__(self): if self.generate_model_pickle is not None else False ) + self.report_title = self.report_title or "Forecast Report" self.report_theme = self.report_theme or "light" self.metrics_filename = self.metrics_filename or "metrics.csv" self.test_metrics_filename = self.test_metrics_filename or "test_metrics.csv" diff --git a/tests/operators/forecast/test_errors.py b/tests/operators/forecast/test_errors.py index 66750d6e6..b0e109012 100644 --- a/tests/operators/forecast/test_errors.py +++ b/tests/operators/forecast/test_errors.py @@ -929,19 +929,24 @@ def test_generate_files(operator_setup, model): ) yaml_i = TEMPLATE_YAML.copy() - yaml_i["spec"]["horizon"] = 10 + yaml_i["spec"]["horizon"] = 3 yaml_i["spec"]["model"] = model yaml_i["spec"]["historical_data"] = {"format": "pandas"} + yaml_i["spec"]["additional_data"] = {"format": "pandas"} yaml_i["spec"]["target_column"] = TARGET_COL.name yaml_i["spec"]["datetime_column"]["name"] = HISTORICAL_DATETIME_COL.name - yaml_i["spec"]["report_title"] = "Skibidi ADS Skibidi" yaml_i["spec"]["output_directory"]["url"] = operator_setup - yaml_i["spec"]["generate_explanations_file"] = False + yaml_i["spec"]["generate_explanation_files"] = False yaml_i["spec"]["generate_forecast_file"] = False yaml_i["spec"]["generate_metrics_file"] = False + yaml_i["spec"]["generate_explanations"] = True df = pd.concat([HISTORICAL_DATETIME_COL[:15], TARGET_COL[:15]], axis=1) + df_add = pd.concat([HISTORICAL_DATETIME_COL[:18], ADD_COLS[:18]], axis=1) + print(f"df: {df}") + print(f"df_add: {df_add}") yaml_i["spec"]["historical_data"]["data"] = df + yaml_i["spec"]["additional_data"]["data"] = df_add operator_config = ForecastOperatorConfig.from_dict(yaml_i) results = operate(operator_config) files = os.listdir(operator_setup) @@ -952,8 +957,28 @@ def test_generate_files(operator_setup, model): assert ( "metrics.csv" not in files ), "Generated metrics file, but `generate_metrics_file` was set False" + assert ( + "local_explanations.csv" not in files + ), "Generated metrics file, but `generate_explanation_files` was set False" + assert ( + "global_explanations.csv" not in files + ), "Generated metrics file, but `generate_explanation_files` was set False" assert not results.get_forecast().empty assert not results.get_metrics().empty + assert not results.get_global_explanations().empty + assert not results.get_local_explanations().empty + + yaml_i["spec"].pop("generate_explanation_files") + yaml_i["spec"].pop("generate_forecast_file") + yaml_i["spec"].pop("generate_metrics_file") + operator_config = ForecastOperatorConfig.from_dict(yaml_i) + results = operate(operator_config) + files = os.listdir(operator_setup) + assert "report.html" in files, "Failed to generate report" + assert "forecast.csv" in files, "Failed to generate forecast file" + assert "metrics.csv" in files, "Failed to generated metrics file" + assert "local_explanation.csv" in files, "Failed to generated local expl file" + assert "global_explanation.csv" in files, "Failed to generated global expl file" if __name__ == "__main__": From 92e22f8a0e859a88479768f4ed78303d9c48425d Mon Sep 17 00:00:00 2001 From: Allen Date: Sat, 15 Mar 2025 10:43:10 +0000 Subject: [PATCH 08/18] fix horizon len --- tests/operators/forecast/test_errors.py | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/tests/operators/forecast/test_errors.py b/tests/operators/forecast/test_errors.py index b9fed206c..c892ed8b7 100644 --- a/tests/operators/forecast/test_errors.py +++ b/tests/operators/forecast/test_errors.py @@ -26,6 +26,7 @@ ForecastSchemaYamlError, ForecastInputDataError, ) +from ads.opctl.operator.lowcode.forecast.operator_config import ForecastOperatorConfig from ads.opctl.operator.lowcode.forecast.utils import smape from ads.opctl.operator.cmd import run @@ -731,12 +732,6 @@ def test_smape_error(): @pytest.mark.parametrize("model", ["prophet"]) def test_pandas_historical_input(operator_setup, model): from ads.opctl.operator.lowcode.forecast.__main__ import operate - from ads.opctl.operator.lowcode.forecast.model.forecast_datasets import ( - ForecastDatasets, - ) - from ads.opctl.operator.lowcode.forecast.operator_config import ( - ForecastOperatorConfig, - ) historical_data_path, additional_data_path, _ = setup_artificial_data( operator_setup @@ -746,7 +741,7 @@ def test_pandas_historical_input(operator_setup, model): historical_data_path=historical_data_path, additional_data_path=additional_data_path, ) - yaml_i["spec"]["horizon"] = 10 + yaml_i["spec"]["horizon"] = HORIZON yaml_i["spec"]["model"] = model df = pd.read_csv(historical_data_path) yaml_i["spec"]["historical_data"].pop("url") @@ -763,12 +758,6 @@ def test_pandas_historical_input(operator_setup, model): @pytest.mark.parametrize("model", ["prophet"]) def test_pandas_additional_input(operator_setup, model): from ads.opctl.operator.lowcode.forecast.__main__ import operate - from ads.opctl.operator.lowcode.forecast.model.forecast_datasets import ( - ForecastDatasets, - ) - from ads.opctl.operator.lowcode.forecast.operator_config import ( - ForecastOperatorConfig, - ) tmpdirname = operator_setup historical_data_path, additional_data_path = setup_small_rossman() @@ -898,9 +887,6 @@ def test_auto_select(operator_setup): @pytest.mark.parametrize("model", ["prophet"]) def test_prophet_floor_cap(operator_setup, model): from ads.opctl.operator.lowcode.forecast.__main__ import operate - from ads.opctl.operator.lowcode.forecast.operator_config import ( - ForecastOperatorConfig, - ) yaml_i = TEMPLATE_YAML.copy() yaml_i["spec"]["horizon"] = 10 From 3c6e945f7c524f0f30f1d48e935c4176ed4277c9 Mon Sep 17 00:00:00 2001 From: Allen Date: Sat, 15 Mar 2025 11:28:18 +0000 Subject: [PATCH 09/18] merge error in unit test --- tests/operators/forecast/test_errors.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/operators/forecast/test_errors.py b/tests/operators/forecast/test_errors.py index 70ba4fb75..37bbc39f7 100644 --- a/tests/operators/forecast/test_errors.py +++ b/tests/operators/forecast/test_errors.py @@ -914,8 +914,9 @@ def test_prophet_floor_cap(operator_setup, model): yaml_i["spec"]["model"] = model yaml_i["spec"]["historical_data"] = {"format": "pandas"} yaml_i["spec"]["datetime_column"]["name"] = HISTORICAL_DATETIME_COL.name - yaml_i["spec"]["output_directory"]["url"] = operator_setup + # yaml_i["spec"]["output_directory"]["url"] = operator_setup yaml_i["spec"]["target_column"] = "target" + yaml_i["spec"]["model_kwargs"] = {"min": 0, "max": 20} target_column = pd.Series(np.arange(20, -6, -2), name="target") df = pd.concat( From ea9258b8a72d4c93fbd53fd1374f9be81681ae17 Mon Sep 17 00:00:00 2001 From: Allen Date: Sat, 15 Mar 2025 12:33:06 +0000 Subject: [PATCH 10/18] debug test failing remotely but passing localy --- tests/operators/forecast/test_errors.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tests/operators/forecast/test_errors.py b/tests/operators/forecast/test_errors.py index 37bbc39f7..d8025d97d 100644 --- a/tests/operators/forecast/test_errors.py +++ b/tests/operators/forecast/test_errors.py @@ -914,7 +914,7 @@ def test_prophet_floor_cap(operator_setup, model): yaml_i["spec"]["model"] = model yaml_i["spec"]["historical_data"] = {"format": "pandas"} yaml_i["spec"]["datetime_column"]["name"] = HISTORICAL_DATETIME_COL.name - # yaml_i["spec"]["output_directory"]["url"] = operator_setup + yaml_i["spec"]["output_directory"]["url"] = operator_setup yaml_i["spec"]["target_column"] = "target" yaml_i["spec"]["model_kwargs"] = {"min": 0, "max": 20} @@ -969,6 +969,10 @@ def test_generate_files(operator_setup, model): operator_config = ForecastOperatorConfig.from_dict(yaml_i) results = operate(operator_config) files = os.listdir(operator_setup) + if "errors.json" in files: + with open(os.path.join(operator_setup, "errors.json")) as f: + print(f"Errors in build! {f.read()}") + assert False, "Failed due to errors.json being created" assert "report.html" in files, "Failed to generate report" assert ( "forecast.csv" not in files @@ -984,6 +988,7 @@ def test_generate_files(operator_setup, model): ), "Generated metrics file, but `generate_explanation_files` was set False" assert not results.get_forecast().empty assert not results.get_metrics().empty + print(f"global expl: {results.get_global_explanations()}") assert not results.get_global_explanations().empty assert not results.get_local_explanations().empty @@ -993,6 +998,10 @@ def test_generate_files(operator_setup, model): operator_config = ForecastOperatorConfig.from_dict(yaml_i) results = operate(operator_config) files = os.listdir(operator_setup) + if "errors.json" in files: + with open(os.path.join(operator_setup, "errors.json")) as f: + print(f"Errors in build! {f.read()}") + assert False, "Failed due to errors.json being created" assert "report.html" in files, "Failed to generate report" assert "forecast.csv" in files, "Failed to generate forecast file" assert "metrics.csv" in files, "Failed to generated metrics file" From 48b27b3c21591bc8663d24cff47786ed5dda9bfe Mon Sep 17 00:00:00 2001 From: Allen Date: Sun, 16 Mar 2025 15:41:54 +0000 Subject: [PATCH 11/18] update prophet explanations --- .../lowcode/forecast/model/base_model.py | 4 +- .../lowcode/forecast/model/neuralprophet.py | 8 ++- .../lowcode/forecast/model/prophet.py | 51 ++++++++++++------- tests/operators/forecast/test_errors.py | 5 +- 4 files changed, 40 insertions(+), 28 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/base_model.py b/ads/opctl/operator/lowcode/forecast/model/base_model.py index 78dc47926..b42d336f7 100644 --- a/ads/opctl/operator/lowcode/forecast/model/base_model.py +++ b/ads/opctl/operator/lowcode/forecast/model/base_model.py @@ -635,7 +635,9 @@ def _save_report( ) if self.errors_dict: write_data( - data=pd.DataFrame.from_dict(self.errors_dict), + data=pd.DataFrame( + self.errors_dict, index=np.arange(len(self.errors_dict.keys())) + ), filename=os.path.join( unique_output_dir, self.spec.errors_dict_filename ), diff --git a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py index 5f33c72de..d52a5230c 100644 --- a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright (c) 2023, 2024 Oracle and/or its affiliates. +# Copyright (c) 2023, 2025 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ import logging @@ -153,10 +153,8 @@ def _train_model(self, i, s_id, df, model_kwargs): cols_to_read = filter( lambda x: x.startswith("future_regressor"), forecast.columns ) - self.explanations_info[s_id] = forecast[cols_to_read] - self.explanations_info[s_id]["Date"] = forecast["ds"] - self.explanations_info[s_id] = self.explanations_info[s_id].set_index( - "Date" + self.explanations_info[s_id] = ( + forecast[cols_to_read].rename({"ds": "Date"}, axis=1).set_index("Date") ) self.outputs[s_id] = forecast diff --git a/ads/opctl/operator/lowcode/forecast/model/prophet.py b/ads/opctl/operator/lowcode/forecast/model/prophet.py index 1f9d62471..b48976960 100644 --- a/ads/opctl/operator/lowcode/forecast/model/prophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/prophet.py @@ -22,7 +22,6 @@ from ..const import ( DEFAULT_TRIALS, PROPHET_INTERNAL_DATE_COL, - ForecastOutputColumns, SupportedModels, ) from .base_model import ForecastOperatorBaseModel @@ -123,6 +122,14 @@ def _train_model(self, i, series_id, df, model_kwargs): upper_bound=self.get_horizon(forecast["yhat_upper"]).values, lower_bound=self.get_horizon(forecast["yhat_lower"]).values, ) + # Get all features that make up the forecast. Exclude CI (upper/lower) and drop yhat ([:-1]) + core_columns = forecast.columns[ + ~forecast.columns.str.endswith("_lower") + & ~forecast.columns.str.endswith("_upper") + ][:-1] + self.explanations_info[series_id] = ( + forecast[core_columns].rename({"ds": "Date"}, axis=1).set_index("Date") + ) self.models[series_id] = {} self.models[series_id]["model"] = model @@ -151,6 +158,7 @@ def _build_model(self) -> pd.DataFrame: full_data_dict = self.datasets.get_data_by_series() self.models = {} self.outputs = {} + self.explanations_info = {} self.additional_regressors = self.datasets.get_additional_data_column_names() model_kwargs = self.set_kwargs() self.forecast_output = ForecastOutput( @@ -257,6 +265,25 @@ def objective(trial): model_kwargs_i = study.best_params return model_kwargs_i + def explain_model(self): + self.local_explanation = {} + global_expl = [] + + for s_id, expl_df in self.explanations_info.items(): + # Local Expl + self.local_explanation[s_id] = self.get_horizon(expl_df) + self.local_explanation[s_id]["Series"] = s_id + self.local_explanation[s_id].index.rename(self.dt_column_name, inplace=True) + # Global Expl + g_expl = self.drop_horizon(expl_df).mean() + g_expl.name = s_id + global_expl.append(g_expl) + self.global_explanation = pd.concat(global_expl, axis=1) + self.formatted_global_explanation = ( + self.global_explanation / self.global_explanation.sum(axis=0) * 100 + ) + self.formatted_local_explanation = pd.concat(self.local_explanation.values()) + def _generate_report(self): import report_creator as rc from prophet.plot import add_changepoints_to_plot @@ -335,22 +362,6 @@ def _generate_report(self): # If the key is present, call the "explain_model" method self.explain_model() - # Convert the global explanation data to a DataFrame - global_explanation_df = pd.DataFrame(self.global_explanation) - - self.formatted_global_explanation = ( - global_explanation_df / global_explanation_df.sum(axis=0) * 100 - ) - - aggregate_local_explanations = pd.DataFrame() - for s_id, local_ex_df in self.local_explanation.items(): - local_ex_df_copy = local_ex_df.copy() - local_ex_df_copy[ForecastOutputColumns.SERIES] = s_id - aggregate_local_explanations = pd.concat( - [aggregate_local_explanations, local_ex_df_copy], axis=0 - ) - self.formatted_local_explanation = aggregate_local_explanations - if not self.target_cat_col: self.formatted_global_explanation = ( self.formatted_global_explanation.rename( @@ -364,7 +375,7 @@ def _generate_report(self): # Create a markdown section for the global explainability global_explanation_section = rc.Block( - rc.Heading("Global Explanation of Models", level=2), + rc.Heading("Global Explainability", level=2), rc.Text( "The following tables provide the feature attribution for the global explainability." ), @@ -373,7 +384,7 @@ def _generate_report(self): blocks = [ rc.DataTable( - local_ex_df.div(local_ex_df.abs().sum(axis=1), axis=0) * 100, + local_ex_df.drop("Series", axis=1), label=s_id if self.target_cat_col else None, index=True, ) @@ -393,6 +404,8 @@ def _generate_report(self): # Do not fail the whole run due to explanations failure logger.warning(f"Failed to generate Explanations with error: {e}.") logger.debug(f"Full Traceback: {traceback.format_exc()}") + self.errors_dict["explainer_error"] = str(e) + self.errors_dict["explainer_error_error"] = traceback.format_exc() model_description = rc.Text( """Prophet is a procedure for forecasting time series data based on an additive model where non-linear trends are fit with yearly, weekly, and daily seasonality, plus holiday effects. It works best with time series that have strong seasonal effects and several seasons of historical data. Prophet is robust to missing data and shifts in the trend, and typically handles outliers well.""" diff --git a/tests/operators/forecast/test_errors.py b/tests/operators/forecast/test_errors.py index d8025d97d..245d0dc8e 100644 --- a/tests/operators/forecast/test_errors.py +++ b/tests/operators/forecast/test_errors.py @@ -961,6 +961,7 @@ def test_generate_files(operator_setup, model): yaml_i["spec"]["generate_forecast_file"] = False yaml_i["spec"]["generate_metrics_file"] = False yaml_i["spec"]["generate_explanations"] = True + yaml_i["spec"]["model_kwargs"] = {"min": 0, "max": 20} df = pd.concat([HISTORICAL_DATETIME_COL[:15], TARGET_COL[:15]], axis=1) df_add = pd.concat([HISTORICAL_DATETIME_COL[:18], ADD_COLS[:18]], axis=1) @@ -971,8 +972,7 @@ def test_generate_files(operator_setup, model): files = os.listdir(operator_setup) if "errors.json" in files: with open(os.path.join(operator_setup, "errors.json")) as f: - print(f"Errors in build! {f.read()}") - assert False, "Failed due to errors.json being created" + assert False, f"Failed due to errors.json being created: {f.read()}" assert "report.html" in files, "Failed to generate report" assert ( "forecast.csv" not in files @@ -988,7 +988,6 @@ def test_generate_files(operator_setup, model): ), "Generated metrics file, but `generate_explanation_files` was set False" assert not results.get_forecast().empty assert not results.get_metrics().empty - print(f"global expl: {results.get_global_explanations()}") assert not results.get_global_explanations().empty assert not results.get_local_explanations().empty From 8a942c3208d7def3e4084bcb2735b281f399a793 Mon Sep 17 00:00:00 2001 From: Allen Date: Sun, 16 Mar 2025 16:19:16 +0000 Subject: [PATCH 12/18] remove extra cols from explainability --- ads/opctl/operator/lowcode/forecast/model/prophet.py | 12 +++++++++++- tests/operators/forecast/test_errors.py | 12 ++++++++---- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/prophet.py b/ads/opctl/operator/lowcode/forecast/model/prophet.py index b48976960..6a1f08779 100644 --- a/ads/opctl/operator/lowcode/forecast/model/prophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/prophet.py @@ -127,8 +127,18 @@ def _train_model(self, i, series_id, df, model_kwargs): ~forecast.columns.str.endswith("_lower") & ~forecast.columns.str.endswith("_upper") ][:-1] + core_columns = set(core_columns) - set( + "additive_terms", + "extra_regressors_additive", + "multiplicative_terms", + "extra_regressors_multiplicative", + "cap", + "floor", + ) self.explanations_info[series_id] = ( - forecast[core_columns].rename({"ds": "Date"}, axis=1).set_index("Date") + forecast[list(core_columns)] + .rename({"ds": "Date"}, axis=1) + .set_index("Date") ) self.models[series_id] = {} diff --git a/tests/operators/forecast/test_errors.py b/tests/operators/forecast/test_errors.py index 245d0dc8e..ed8d93eb0 100644 --- a/tests/operators/forecast/test_errors.py +++ b/tests/operators/forecast/test_errors.py @@ -969,9 +969,11 @@ def test_generate_files(operator_setup, model): yaml_i["spec"]["additional_data"]["data"] = df_add operator_config = ForecastOperatorConfig.from_dict(yaml_i) results = operate(operator_config) - files = os.listdir(operator_setup) + files = os.listdir(yaml_i["spec"]["output_directory"]["url"]) if "errors.json" in files: - with open(os.path.join(operator_setup, "errors.json")) as f: + with open( + os.path.join(yaml_i["spec"]["output_directory"]["url"], "errors.json") + ) as f: assert False, f"Failed due to errors.json being created: {f.read()}" assert "report.html" in files, "Failed to generate report" assert ( @@ -996,9 +998,11 @@ def test_generate_files(operator_setup, model): yaml_i["spec"].pop("generate_metrics_file") operator_config = ForecastOperatorConfig.from_dict(yaml_i) results = operate(operator_config) - files = os.listdir(operator_setup) + files = os.listdir(yaml_i["spec"]["output_directory"]["url"]) if "errors.json" in files: - with open(os.path.join(operator_setup, "errors.json")) as f: + with open( + os.path.join(yaml_i["spec"]["output_directory"]["url"], "errors.json") + ) as f: print(f"Errors in build! {f.read()}") assert False, "Failed due to errors.json being created" assert "report.html" in files, "Failed to generate report" From 0af84eb3dc344a02279443a41bfb9d6bfc061f6d Mon Sep 17 00:00:00 2001 From: Allen Date: Mon, 17 Mar 2025 12:20:05 +0000 Subject: [PATCH 13/18] combine columns to match old strucutre --- .../lowcode/forecast/model/prophet.py | 25 ++++++++++++++++--- tests/operators/forecast/test_explainers.py | 4 +++ 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/prophet.py b/ads/opctl/operator/lowcode/forecast/model/prophet.py index 6a1f08779..a0e16cb7b 100644 --- a/ads/opctl/operator/lowcode/forecast/model/prophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/prophet.py @@ -122,24 +122,41 @@ def _train_model(self, i, series_id, df, model_kwargs): upper_bound=self.get_horizon(forecast["yhat_upper"]).values, lower_bound=self.get_horizon(forecast["yhat_lower"]).values, ) - # Get all features that make up the forecast. Exclude CI (upper/lower) and drop yhat ([:-1]) + # Get all features that make up the forecast. Exclude CI (upper/lower) core_columns = forecast.columns[ ~forecast.columns.str.endswith("_lower") & ~forecast.columns.str.endswith("_upper") - ][:-1] - core_columns = set(core_columns) - set( + ] + core_columns = set(core_columns) - { "additive_terms", "extra_regressors_additive", "multiplicative_terms", "extra_regressors_multiplicative", "cap", "floor", + "yhat", + } + combine_terms = list( + core_columns.intersection( + { + "trend", + "daily", + "weekly", + "yearly", + "monthly", + "holidays", + "zeros", + } + ) ) - self.explanations_info[series_id] = ( + + temp_df = ( forecast[list(core_columns)] .rename({"ds": "Date"}, axis=1) .set_index("Date") ) + temp_df[self.spec.target_column] = temp_df[combine_terms].sum(axis=1) + self.explanations_info[series_id] = temp_df.drop(combine_terms, axis=1) self.models[series_id] = {} self.models[series_id]["model"] = model diff --git a/tests/operators/forecast/test_explainers.py b/tests/operators/forecast/test_explainers.py index ee175eb60..3d8a73837 100644 --- a/tests/operators/forecast/test_explainers.py +++ b/tests/operators/forecast/test_explainers.py @@ -221,6 +221,10 @@ def test_explanations_filenames(model, num_series): operator_config.spec.local_explanation_filename = local_explanation_filename results = forecast_operate(operator_config) + assert ( + not results.get_global_explanations().empty + ), "Error generating Global Expl" + assert not results.get_local_explanations().empty, "Error generating Local Expl" global_explanation_path = os.path.join( output_directory, global_explanation_filename From 3720e7dd1a70266e153881821047f9516afd0628 Mon Sep 17 00:00:00 2001 From: Allen Date: Mon, 17 Mar 2025 13:50:18 +0000 Subject: [PATCH 14/18] update NP too --- .../lowcode/forecast/model/neuralprophet.py | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py index d52a5230c..1e3517188 100644 --- a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py @@ -150,12 +150,22 @@ def _train_model(self, i, s_id, df, model_kwargs): logger.debug(forecast.tail()) # TODO; could also extract trend and seasonality? - cols_to_read = filter( - lambda x: x.startswith("future_regressor"), forecast.columns + cols_to_read = set( + forecast.columns[forecast.columns.str.startswith("future_regressor")] + + ["ds", "trend"] ) - self.explanations_info[s_id] = ( - forecast[cols_to_read].rename({"ds": "Date"}, axis=1).set_index("Date") + cols_to_read = cols_to_read - { + "future_regressors_additive", + "future_regressors_multiplicative", + } + combine_terms = cols_to_read - set(self.accepted_regressors[s_id]) + temp_df = ( + forecast[list(cols_to_read)] + .rename({"ds": "Date"}, axis=1) + .set_index("Date") ) + temp_df[self.spec.target_column] = temp_df[combine_terms].sum(axis=1) + self.explanations_info[s_id] = temp_df.drop(combine_terms, axis=1) self.outputs[s_id] = forecast self.forecast_output.populate_series_output( @@ -457,9 +467,7 @@ def explain_model(self): for s_id, expl_df in self.explanations_info.items(): expl_df = expl_df.rename(rename_cols, axis=1) # Local Expl - self.local_explanation[s_id] = self.get_horizon(expl_df).drop( - ["future_regressors_additive"], axis=1 - ) + self.local_explanation[s_id] = self.get_horizon(expl_df) self.local_explanation[s_id]["Series"] = s_id self.local_explanation[s_id].index.rename(self.dt_column_name, inplace=True) # Global Expl @@ -467,9 +475,6 @@ def explain_model(self): g_expl.name = s_id global_expl.append(g_expl) self.global_explanation = pd.concat(global_expl, axis=1) - self.global_explanation = self.global_explanation.drop( - index=["future_regressors_additive"], axis=0 - ) self.formatted_global_explanation = ( self.global_explanation / self.global_explanation.sum(axis=0) * 100 ) From ebf73e70748ba3d029308908029a3aa8a6e58cde Mon Sep 17 00:00:00 2001 From: Allen Date: Mon, 17 Mar 2025 16:30:08 +0000 Subject: [PATCH 15/18] polish NP code --- .../lowcode/forecast/model/neuralprophet.py | 54 ++++++++++--------- tests/operators/forecast/test_explainers.py | 2 +- 2 files changed, 31 insertions(+), 25 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py index 1e3517188..0afc92f78 100644 --- a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py @@ -149,36 +149,42 @@ def _train_model(self, i, s_id, df, model_kwargs): logger.debug(f"-----------------Model {i}----------------------") logger.debug(forecast.tail()) - # TODO; could also extract trend and seasonality? - cols_to_read = set( - forecast.columns[forecast.columns.str.startswith("future_regressor")] - + ["ds", "trend"] - ) - cols_to_read = cols_to_read - { - "future_regressors_additive", - "future_regressors_multiplicative", - } - combine_terms = cols_to_read - set(self.accepted_regressors[s_id]) - temp_df = ( - forecast[list(cols_to_read)] - .rename({"ds": "Date"}, axis=1) - .set_index("Date") - ) - temp_df[self.spec.target_column] = temp_df[combine_terms].sum(axis=1) - self.explanations_info[s_id] = temp_df.drop(combine_terms, axis=1) - self.outputs[s_id] = forecast + upper_bound_col_name = f"yhat1 {model_kwargs['quantiles'][1]*100}%" + lower_bound_col_name = f"yhat1 {model_kwargs['quantiles'][0]*100}%" self.forecast_output.populate_series_output( series_id=s_id, fit_val=self.drop_horizon(forecast["yhat1"]).values, forecast_val=self.get_horizon(forecast["yhat1"]).values, - upper_bound=self.get_horizon( - forecast[f"yhat1 {model_kwargs['quantiles'][1]*100}%"] - ).values, - lower_bound=self.get_horizon( - forecast[f"yhat1 {model_kwargs['quantiles'][0]*100}%"] - ).values, + upper_bound=self.get_horizon(forecast[upper_bound_col_name]).values, + lower_bound=self.get_horizon(forecast[lower_bound_col_name]).values, ) + core_columns = set(forecast.columns) - set( + [ + "y", + "yhat1", + upper_bound_col_name, + lower_bound_col_name, + "future_regressors_additive", + "future_regressors_multiplicative", + ] + ) + exog_variables = set( + filter(lambda x: x.startswith("future_regressor_"), list(core_columns)) + ) + combine_terms = list(core_columns - exog_variables - set(["ds"])) + temp_df = ( + forecast[list(core_columns)] + .rename({"ds": "Date"}, axis=1) + .set_index("Date") + ) + if combine_terms: + temp_df[self.spec.target_column] = temp_df[combine_terms].sum(axis=1) + temp_df = temp_df.drop(combine_terms, axis=1) + else: + temp_df[self.spec.target_column] = 0 + # Todo: check for columns that were dropped, and set them to 0 + self.explanations_info[s_id] = temp_df self.trainers[s_id] = model.trainer self.models[s_id] = {} diff --git a/tests/operators/forecast/test_explainers.py b/tests/operators/forecast/test_explainers.py index 3d8a73837..f158302b0 100644 --- a/tests/operators/forecast/test_explainers.py +++ b/tests/operators/forecast/test_explainers.py @@ -197,7 +197,7 @@ def test_explanations_output_and_columns(model, freq, num_series): ), f"Column {column} missing in local explanations" -@pytest.mark.parametrize("model", MODELS) +@pytest.mark.parametrize("model", MODELS) # MODELS @pytest.mark.parametrize("num_series", [1]) def test_explanations_filenames(model, num_series): """ From 5225e65d5bdd92232e74924018f45fd3c05fb4b6 Mon Sep 17 00:00:00 2001 From: Allen Date: Mon, 17 Mar 2025 21:35:55 +0000 Subject: [PATCH 16/18] clean up error messaging --- ads/opctl/operator/lowcode/common/utils.py | 5 +++++ ads/opctl/operator/lowcode/forecast/model/arima.py | 5 +++-- ads/opctl/operator/lowcode/forecast/model/automlx.py | 7 ++++++- .../operator/lowcode/forecast/model/base_model.py | 10 +++------- .../operator/lowcode/forecast/model/neuralprophet.py | 1 - tests/operators/forecast/test_errors.py | 8 ++++---- 6 files changed, 21 insertions(+), 15 deletions(-) diff --git a/ads/opctl/operator/lowcode/common/utils.py b/ads/opctl/operator/lowcode/common/utils.py index 818dccb5d..8fb3f5a23 100644 --- a/ads/opctl/operator/lowcode/common/utils.py +++ b/ads/opctl/operator/lowcode/common/utils.py @@ -142,6 +142,11 @@ def write_data(data, filename, format, storage_options=None, index=False, **kwar ) +def write_json(json_dict, filename, storage_options=None): + with fsspec.open(filename, mode="w", **storage_options) as f: + f.write(json.dumps(json_dict)) + + def write_simple_json(data, path): if ObjectStorageDetails.is_oci_path(path): storage_options = default_signer() diff --git a/ads/opctl/operator/lowcode/forecast/model/arima.py b/ads/opctl/operator/lowcode/forecast/model/arima.py index 6639ba097..cd7271fec 100644 --- a/ads/opctl/operator/lowcode/forecast/model/arima.py +++ b/ads/opctl/operator/lowcode/forecast/model/arima.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -# Copyright (c) 2023, 2024 Oracle and/or its affiliates. +# Copyright (c) 2023, 2025 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ import logging @@ -132,11 +132,12 @@ def _train_model(self, i, s_id, df, model_kwargs): logger.debug("===========Done===========") except Exception as e: - self.errors_dict[s_id] = { + new_error = { "model_name": self.spec.model, "error": str(e), "error_trace": traceback.format_exc(), } + self.errors_dict[s_id] = new_error logger.warning(f"Encountered Error: {e}. Skipping.") logger.warning(traceback.format_exc()) diff --git a/ads/opctl/operator/lowcode/forecast/model/automlx.py b/ads/opctl/operator/lowcode/forecast/model/automlx.py index 4628e12e7..916d8538d 100644 --- a/ads/opctl/operator/lowcode/forecast/model/automlx.py +++ b/ads/opctl/operator/lowcode/forecast/model/automlx.py @@ -184,12 +184,17 @@ def _build_model(self) -> pd.DataFrame: "selected_model_params": model.selected_model_params_, } except Exception as e: - self.errors_dict[s_id] = { + new_error = { "model_name": self.spec.model, "error": str(e), "error_trace": traceback.format_exc(), } + if s_id in self.errors_dict: + self.errors_dict[s_id]["model_fitting"] = new_error + else: + self.errors_dict[s_id] = {"model_fitting": new_error} logger.warning(f"Encountered Error: {e}. Skipping.") + logger.warning(f"self.errors_dict[s_id]: {self.errors_dict[s_id]}") logger.warning(traceback.format_exc()) logger.debug("===========Forecast Generated===========") diff --git a/ads/opctl/operator/lowcode/forecast/model/base_model.py b/ads/opctl/operator/lowcode/forecast/model/base_model.py index b42d336f7..380687a59 100644 --- a/ads/opctl/operator/lowcode/forecast/model/base_model.py +++ b/ads/opctl/operator/lowcode/forecast/model/base_model.py @@ -28,6 +28,7 @@ merged_category_column_name, seconds_to_datetime, write_data, + write_json, ) from ads.opctl.operator.lowcode.forecast.utils import ( _build_metrics_df, @@ -634,17 +635,12 @@ def _save_report( f"The outputs have been successfully generated and placed into the directory: {unique_output_dir}." ) if self.errors_dict: - write_data( - data=pd.DataFrame( - self.errors_dict, index=np.arange(len(self.errors_dict.keys())) - ), + write_json( + json_dict=self.errors_dict, filename=os.path.join( unique_output_dir, self.spec.errors_dict_filename ), - format="json", storage_options=storage_options, - index=True, - indent=4, ) results.set_errors_dict(self.errors_dict) else: diff --git a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py index 0afc92f78..91a533b17 100644 --- a/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py +++ b/ads/opctl/operator/lowcode/forecast/model/neuralprophet.py @@ -229,7 +229,6 @@ def _build_model(self) -> pd.DataFrame: self.models = {} self.trainers = {} self.outputs = {} - self.errors_dict = {} self.explanations_info = {} self.accepted_regressors = {} self.additional_regressors = self.datasets.get_additional_data_column_names() diff --git a/tests/operators/forecast/test_errors.py b/tests/operators/forecast/test_errors.py index ed8d93eb0..dd13356a4 100644 --- a/tests/operators/forecast/test_errors.py +++ b/tests/operators/forecast/test_errors.py @@ -255,7 +255,7 @@ def setup_rossman(): def setup_faulty_rossman(): curr_dir = pathlib.Path(__file__).parent.resolve() - data_folder = f"{curr_dir}/../data/" + data_folder = f"{curr_dir}/../data" historical_data_path = f"{data_folder}/rs_2_prim.csv" additional_data_path = f"{data_folder}/rs_2_add_encoded.csv" return historical_data_path, additional_data_path @@ -707,10 +707,10 @@ def test_arima_automlx_errors(operator_setup, model): error_content = json.load(error_file) assert ( "Input data does not have a consistent (in terms of diff) DatetimeIndex." - in error_content["13"]["error"] - ), "Error message mismatch" + in error_content["13"]["model_fitting"]["error"] + ), f"Error message mismatch: {error_content}" - if model not in ["autots"]: # , "lgbforecast" + if model not in ["autots", "automlx"]: # , "lgbforecast" if yaml_i["spec"].get("explanations_accuracy_mode") != "AUTOMLX": global_fn = f"{tmpdirname}/results/global_explanation.csv" assert os.path.exists( From a9c51ecfd42df850b8fc2e9dfe08d444c913830e Mon Sep 17 00:00:00 2001 From: Allen Date: Tue, 18 Mar 2025 11:56:42 +0000 Subject: [PATCH 17/18] release candidate 0 for 2.13.2 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 03da1f655..a6002de33 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,7 @@ build-backend = "flit_core.buildapi" # Required name = "oracle_ads" # the install (PyPI) name; name for local build in [tool.flit.module] section below -version = "2.13.2" +version = "2.13.2rc0" # Optional description = "Oracle Accelerated Data Science SDK" From 9430c7ebab8092975ef5e9176fe2059fc592e5cf Mon Sep 17 00:00:00 2001 From: Allen Date: Tue, 18 Mar 2025 12:31:09 +0000 Subject: [PATCH 18/18] release candidate 1 for 2.13.2 --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index a6002de33..08b156330 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,7 @@ build-backend = "flit_core.buildapi" # Required name = "oracle_ads" # the install (PyPI) name; name for local build in [tool.flit.module] section below -version = "2.13.2rc0" +version = "2.13.2rc1" # Optional description = "Oracle Accelerated Data Science SDK"