From 91e0e8bbf028f1bac82bdf69aa76ce75fd7eb4ea Mon Sep 17 00:00:00 2001 From: Vikas Pandey Date: Fri, 20 Jun 2025 00:50:11 +0530 Subject: [PATCH 1/3] Feature: Implement series-specific model selection using meta-features - Enhanced AbstractData to support subsetting of data based on specified series. - Introduced MetaSelector class to select the best forecasting model based on pre-learned meta-rules. - Updated ForecastOperatorModelFactory to utilize MetaSelector for AUTO_SELECT_SERIES. - Modified HistoricalData and AdditionalData classes to accept a subset parameter for filtering. - Added new schema option for auto-selecting series in the forecast configuration. - Improved data loading functions to handle subsetting effectively. - Refactored model report generation to ensure unique filenames for outputs. --- ads/opctl/operator/lowcode/common/data.py | 9 +- .../lowcode/common/transformations.py | 207 ++++++++++++ ads/opctl/operator/lowcode/common/utils.py | 8 + .../operator/lowcode/forecast/__main__.py | 48 ++- ads/opctl/operator/lowcode/forecast/const.py | 1 + .../lowcode/forecast/meta_selector.py | 312 ++++++++++++++++++ .../lowcode/forecast/model/base_model.py | 148 +++++++-- .../lowcode/forecast/model/factory.py | 27 +- .../forecast/model/forecast_datasets.py | 51 ++- .../operator/lowcode/forecast/schema.yaml | 1 + 10 files changed, 767 insertions(+), 45 deletions(-) create mode 100644 ads/opctl/operator/lowcode/forecast/meta_selector.py diff --git a/ads/opctl/operator/lowcode/common/data.py b/ads/opctl/operator/lowcode/common/data.py index 08ef63706..32641c011 100644 --- a/ads/opctl/operator/lowcode/common/data.py +++ b/ads/opctl/operator/lowcode/common/data.py @@ -19,16 +19,21 @@ class AbstractData(ABC): - def __init__(self, spec, name="input_data", data=None): + def __init__(self, spec, name="input_data", data=None, subset=None): self.Transformations = Transformations self.data = None self._data_dict = dict() self.name = name self.spec = spec + self.subset = subset if data is not None: self.data = data else: self.load_transform_ingest_data(spec) + # Subset by series if requested + # if self.subset is not None and hasattr(self, 'data') and self.data is not None: + # subset_str = [str(s) for s in self.subset] + # self.data = self.data[self.data.index.get_level_values(DataColumns.Series).isin(subset_str)] def get_raw_data_by_cat(self, category): mapping = self._data_transformer.get_target_category_columns_map() @@ -72,7 +77,7 @@ def get_data_for_series(self, series_id): def _load_data(self, data_spec, **kwargs): loading_start_time = time.time() try: - raw_data = load_data(data_spec) + raw_data = load_data(data_spec, subset=self.subset if self.subset else None, target_category_columns=self.spec.target_category_columns) except InvalidParameterError as e: e.args = e.args + (f"Invalid Parameter: {self.name}",) raise e diff --git a/ads/opctl/operator/lowcode/common/transformations.py b/ads/opctl/operator/lowcode/common/transformations.py index d9e4924c0..b5367fe19 100644 --- a/ads/opctl/operator/lowcode/common/transformations.py +++ b/ads/opctl/operator/lowcode/common/transformations.py @@ -294,3 +294,210 @@ def get_target_category_columns_map(self): def _fill_na(self, df: pd.DataFrame, na_value=0) -> pd.DataFrame: """Fill nans in dataframe""" return df.fillna(value=na_value) + + def build_fforms_meta_features(self, data, target_col=None, group_cols=None): + """ + Build meta-features for time series based on FFORMS paper and add them to the original DataFrame. + + Parameters + ---------- + data : pandas.DataFrame + Input DataFrame containing time series data + target_col : str, optional + Name of the target column to calculate meta-features for. + If None, uses the target column specified in dataset_info. + group_cols : list of str, optional + List of columns to group by before calculating meta-features. + If None, calculates features for the entire series. + + Returns + ------- + pandas.DataFrame + Original DataFrame with additional meta-feature columns + + References + ---------- + Talagala, T. S., Hyndman, R. J., & Athanasopoulos, G. (2023). + Meta-learning how to forecast time series. Journal of Forecasting, 42(6), 1476-1501. + """ + if not isinstance(data, pd.DataFrame): + raise ValueError("Input must be a pandas DataFrame") + + # Use target column from dataset_info if not specified + if target_col is None: + target_col = self.target_column_name + if target_col not in data.columns: + raise ValueError(f"Target column '{target_col}' not found in DataFrame") + + # Check if group_cols are provided and valid + if group_cols is not None: + if not isinstance(group_cols, list): + raise ValueError("group_cols must be a list of column names") + for col in group_cols: + if col not in data.columns: + raise ValueError(f"Group column '{col}' not found in DataFrame") + + # If no group_cols, get the target_category_columns else treat the entire DataFrame as a single series + if not group_cols: + group_cols = self.target_category_columns if self.target_category_columns else [] + + # Calculate meta-features for each series + def calculate_series_features(series): + """Calculate features for a single series""" + n = len(series) + values = series.values + + # Basic statistics + mean = series.mean() + std = series.std() + variance = series.var() + skewness = series.skew() + kurtosis = series.kurtosis() + cv = std / mean if mean != 0 else np.inf + + # Trend features + X = np.vstack([np.arange(n), np.ones(n)]).T + trend_coef = np.linalg.lstsq(X, values, rcond=None)[0][0] + trend_pred = X.dot(np.linalg.lstsq(X, values, rcond=None)[0]) + residuals = values - trend_pred + std_residuals = np.std(residuals) + + # Turning points + turning_points = 0 + for i in range(1, n-1): + if (values[i-1] < values[i] and values[i] > values[i+1]) or \ + (values[i-1] > values[i] and values[i] < values[i+1]): + turning_points += 1 + turning_points_rate = turning_points / (n-2) if n > 2 else 0 + + # Serial correlation + acf1 = series.autocorr(lag=1) if n > 1 else 0 + acf2 = series.autocorr(lag=2) if n > 2 else 0 + acf10 = series.autocorr(lag=10) if n > 10 else 0 + + # Seasonality features + seasonal_strength = 0 + seasonal_peak_strength = 0 + if n >= 12: + seasonal_lags = [12, 24, 36] + seasonal_acfs = [] + for lag in seasonal_lags: + if n > lag: + acf_val = series.autocorr(lag=lag) + seasonal_acfs.append(abs(acf_val)) + seasonal_peak_strength = max(seasonal_acfs) if seasonal_acfs else 0 + + ma = series.rolling(window=12, center=True).mean() + seasonal_comp = series - ma + seasonal_strength = 1 - np.var(seasonal_comp.dropna()) / np.var(series) + + # Stability and volatility features + values_above_mean = values >= mean + crossing_points = np.sum(values_above_mean[1:] != values_above_mean[:-1]) + crossing_rate = crossing_points / (n - 1) if n > 1 else 0 + + # First and second differences + diff1 = np.diff(values) + diff2 = np.diff(diff1) if len(diff1) > 1 else np.array([]) + + diff1_mean = np.mean(np.abs(diff1)) if len(diff1) > 0 else 0 + diff1_var = np.var(diff1) if len(diff1) > 0 else 0 + diff2_mean = np.mean(np.abs(diff2)) if len(diff2) > 0 else 0 + diff2_var = np.var(diff2) if len(diff2) > 0 else 0 + + # Nonlinearity features + if n > 3: + X = values[:-1].reshape(-1, 1) + y = values[1:] + X2 = X * X + X3 = X * X * X + X_aug = np.hstack([X, X2, X3]) + nonlinearity = np.linalg.lstsq(X_aug, y, rcond=None)[1][0] if len(y) > 0 else 0 + else: + nonlinearity = 0 + + # Long-term trend features + if n >= 10: + mid = n // 2 + trend_change = np.mean(values[mid:]) - np.mean(values[:mid]) + else: + trend_change = 0 + + # Step changes and spikes + step_changes = np.abs(diff1).max() if len(diff1) > 0 else 0 + spikes = np.sum(np.abs(values - mean) > 2 * std) / n if std != 0 else 0 + + # Hurst exponent and entropy + lag = min(10, n // 2) + variance_ratio = np.var(series.diff(lag)) / (lag * np.var(series.diff())) if n > lag else 0 + hurst = np.log(variance_ratio) / (2 * np.log(lag)) if variance_ratio > 0 and lag > 1 else 0 + + hist, _ = np.histogram(series, bins='auto', density=True) + entropy = -np.sum(hist[hist > 0] * np.log(hist[hist > 0])) + + return pd.Series({ + 'ts_n_obs': n, + 'ts_mean': mean, + 'ts_std': std, + 'ts_variance': variance, + 'ts_cv': cv, + 'ts_skewness': skewness, + 'ts_kurtosis': kurtosis, + 'ts_trend': trend_coef, + 'ts_trend_change': trend_change, + 'ts_std_residuals': std_residuals, + 'ts_turning_points_rate': turning_points_rate, + 'ts_seasonal_strength': seasonal_strength, + 'ts_seasonal_peak_strength': seasonal_peak_strength, + 'ts_acf1': acf1, + 'ts_acf2': acf2, + 'ts_acf10': acf10, + 'ts_crossing_rate': crossing_rate, + 'ts_diff1_mean': diff1_mean, + 'ts_diff1_variance': diff1_var, + 'ts_diff2_mean': diff2_mean, + 'ts_diff2_variance': diff2_var, + 'ts_nonlinearity': nonlinearity, + 'ts_step_max': step_changes, + 'ts_spikes_rate': spikes, + 'ts_hurst': hurst, + 'ts_entropy': entropy + }) + + # Create copy of input DataFrame + result_df = data.copy() + + if group_cols: + # Calculate features for each group + features = [] + # Sort by date within each group if date column exists + date_col = self.dt_column_name if self.dt_column_name else 'Date' + if date_col in data.columns: + data = data.sort_values([date_col] + group_cols) + + for name, group in data.groupby(group_cols): + # Sort group by date if exists + if date_col in group.columns: + group = group.sort_values(date_col) + group_features = calculate_series_features(group[target_col]) + if isinstance(name, tuple): + feature_row = dict(zip(group_cols, name)) + else: + feature_row = {group_cols[0]: name} + feature_row.update(group_features) + features.append(feature_row) + + # Create features DataFrame without merging + features_df = pd.DataFrame(features) + # Return only the meta-features DataFrame with group columns + return features_df + else: + # Sort by date if exists and calculate features for entire series + date_col = self.dt_column_name if self.dt_column_name else 'Date' + if date_col in data.columns: + data = data.sort_values(date_col) + features = calculate_series_features(data[target_col]) + # Return single row DataFrame with meta-features + return pd.DataFrame([features]) + + return result_df diff --git a/ads/opctl/operator/lowcode/common/utils.py b/ads/opctl/operator/lowcode/common/utils.py index 3578b7df3..dc32a7f8e 100644 --- a/ads/opctl/operator/lowcode/common/utils.py +++ b/ads/opctl/operator/lowcode/common/utils.py @@ -124,6 +124,14 @@ def load_data(data_spec, storage_options=None, **kwargs): data = data[columns] if limit: data = data[:limit] + # Filtering by subset if provided + subset = kwargs.get('subset', None) + if subset is not None: + target_category_columns = kwargs.get('target_category_columns', None) + mask = False + for col in target_category_columns: + mask = mask | data[col].isin(subset) + data = data[mask] return data diff --git a/ads/opctl/operator/lowcode/forecast/__main__.py b/ads/opctl/operator/lowcode/forecast/__main__.py index cad6153b7..148c9b154 100644 --- a/ads/opctl/operator/lowcode/forecast/__main__.py +++ b/ads/opctl/operator/lowcode/forecast/__main__.py @@ -3,17 +3,20 @@ # Copyright (c) 2023, 2025 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ +import copy import json import os import sys from typing import Dict, List +import pandas as pd import yaml from ads.opctl import logger from ads.opctl.operator.common.const import ENV_OPERATOR_ARGS from ads.opctl.operator.common.utils import _parse_input_args +from .const import AUTO_SELECT_SERIES from .model.forecast_datasets import ForecastDatasets, ForecastResults from .operator_config import ForecastOperatorConfig from .whatifserve import ModelDeploymentManager @@ -24,9 +27,48 @@ def operate(operator_config: ForecastOperatorConfig) -> ForecastResults: from .model.factory import ForecastOperatorModelFactory datasets = ForecastDatasets(operator_config) - results = ForecastOperatorModelFactory.get_model( - operator_config, datasets - ).generate_report() + model = ForecastOperatorModelFactory.get_model(operator_config, datasets) + + if operator_config.spec.model == AUTO_SELECT_SERIES and hasattr(operator_config.spec, 'meta_features'): + # For AUTO_SELECT_SERIES, handle each series with its specific model + meta_features = operator_config.spec.meta_features + results = ForecastResults() + results_df = pd.DataFrame() + elapsed_time = 0 + sub_model_list = [] + sub_results_list = [] + + # Group the data by selected model + for model_name in meta_features['selected_model'].unique(): + # Get series that use this model + series_groups = meta_features[meta_features['selected_model'] == model_name] + + # Create a sub-config for this model + sub_config = copy.deepcopy(operator_config) + sub_config.spec.model = model_name + + # Create sub-datasets for these series + sub_datasets = ForecastDatasets(operator_config, subset=series_groups[operator_config.spec.target_category_columns].values.flatten().tolist()) + + # Get and run the appropriate model + sub_model = ForecastOperatorModelFactory.get_model(sub_config, sub_datasets) + sub_result_df, sub_elapsed_time = sub_model.build_model() + sub_results = sub_model.generate_report(result_df=sub_result_df, elapsed_time=sub_elapsed_time) + sub_results_list.append(sub_results) + + # results_df = pd.concat([results_df, sub_result_df], ignore_index=True, axis=0) + # elapsed_time += sub_elapsed_time + # Merge all sub_results into a single ForecastResults object + if sub_results_list: + results = sub_results_list[0] + for sub_result in sub_results_list[1:]: + results.merge(sub_result) + else: + results = None + + else: + # For other cases, use the single selected model + results = model.generate_report() # saving to model catalog spec = operator_config.spec if spec.what_if_analysis and datasets.additional_data: diff --git a/ads/opctl/operator/lowcode/forecast/const.py b/ads/opctl/operator/lowcode/forecast/const.py index 36adfa694..d3d4040b0 100644 --- a/ads/opctl/operator/lowcode/forecast/const.py +++ b/ads/opctl/operator/lowcode/forecast/const.py @@ -89,4 +89,5 @@ class ForecastOutputColumns(ExtendedEnum): PROPHET_INTERNAL_DATE_COL = "ds" RENDER_LIMIT = 5000 AUTO_SELECT = "auto-select" +AUTO_SELECT_SERIES = "auto-select-series" BACKTEST_REPORT_NAME = "back_test.csv" diff --git a/ads/opctl/operator/lowcode/forecast/meta_selector.py b/ads/opctl/operator/lowcode/forecast/meta_selector.py new file mode 100644 index 000000000..18c243bad --- /dev/null +++ b/ads/opctl/operator/lowcode/forecast/meta_selector.py @@ -0,0 +1,312 @@ +#!/usr/bin/env python + +# Copyright (c) 2023, 2025 Oracle and/or its affiliates. +# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ + +import pandas as pd +import numpy as np +from ..common.transformations import Transformations + + +class MetaSelector: + """ + A class to select the best forecasting model for each series based on pre-learned meta-rules. + The rules are based on the meta-features calculated by the FFORMS approach. + """ + + def __init__(self): + """Initialize the MetaSelector with pre-learned meta rules""" + # Pre-learned rules based on meta-features + self._meta_rules = { + # Rule 1: Strong trend, weak seasonality → ARIMA + 'arima_0': { + 'conditions': [ + ('ts_trend', 'abs>=', 0.65), # Strong trend strength + ('ts_seasonal_strength', '<', 0.20), # Weak seasonality + ], + 'model': 'arima', + 'priority': 1 + }, + # Rule 2: Strong seasonality, long series → Prophet + 'prophet_0': { + 'conditions': [ + ('ts_seasonal_strength', '>=', 0.50), # Strong seasonality + ('ts_n_obs', '>=', 200), # Long series + ], + 'model': 'prophet', + 'priority': 2 + }, + # Rule 3: High entropy, low autocorrelation → AutoMLX + 'automlx_0': { + 'conditions': [ + ('ts_entropy', '>=', 4.0), # High entropy + ('ts_acf1', '<=', 0.30), # Low autocorrelation + ], + 'model': 'automlx', + 'priority': 3 + }, + # Rule 4: Strong seasonality with trend and changing patterns → Prophet + 'prophet_1': { + 'conditions': [ + ('ts_seasonal_strength', '>=', 0.3), # Strong seasonality + ('ts_trend', 'abs>=', 0.1), # Clear trend + ('ts_turning_points_rate', '>=', 0.2), # Multiple change points + ('ts_n_obs', '>=', 50), # Sufficient data + ('ts_step_max', '>=', 100), # Significant steps + ('ts_diff1_variance', '>=', 10) # Variable differences + ], + 'model': 'prophet', + 'priority': 4 + }, + # Rule 5: Multiple seasonality with nonlinear patterns → Prophet + 'prophet_2': { + 'conditions': [ + ('ts_seasonal_peak_strength', '>=', 0.4), # Strong peak seasonality + ('ts_seasonal_strength', '>=', 0.2), # Overall seasonality + ('ts_acf10', '>=', 0.2), # Long-term correlation + ('ts_entropy', '>=', 0.5), # Complex patterns + ('ts_crossing_rate', '>=', 0.3) # Frequent mean crossings + ], + 'model': 'prophet', + 'priority': 5 + }, + # Rule 6: Strong autocorrelation with stationary behavior → ARIMA + 'arima_1': { + 'conditions': [ + ('ts_acf1', '>=', 0.7), # Strong lag-1 correlation + ('ts_acf2', '>=', 0.5), # Strong lag-2 correlation + ('ts_seasonal_strength', '<', 0.3), # Weak seasonality + ('ts_std_residuals', '<', 500), # Stable residuals + ('ts_diff1_variance', '<', 100), # Stable first differences + ('ts_hurst', '>', -0.1) # Some persistence + ], + 'model': 'arima', + 'priority': 6 + }, + # Rule 7: Linear trend with moderate noise → ARIMA + 'arima_2': { + 'conditions': [ + ('ts_trend', 'abs>=', 0.15), # Clear trend + ('ts_trend_change', '<', 100), # Stable trend + ('ts_cv', '<', 0.4), # Low variation + ('ts_kurtosis', '<', 5), # Normal-like distribution + ('ts_nonlinearity', '<', 1e5) # Linear relationships + ], + 'model': 'arima', + 'priority': 7 + }, + # Rule 8: Complex seasonality with high nonlinearity → NeuralProphet + 'neuralprophet_1': { + 'conditions': [ + ('ts_seasonal_peak_strength', '>=', 0.5), # Strong seasonal peaks + ('ts_nonlinearity', '>=', 1e6), # Nonlinear patterns + ('ts_n_obs', '>=', 200), # Long series + ('ts_entropy', '>=', 0.6), # Complex patterns + ('ts_diff2_variance', '>=', 50) # Variable acceleration + ], + 'model': 'neuralprophet', + 'priority': 8 + }, + # Rule 9: Multiple seasonal patterns with changing behavior → NeuralProphet + 'neuralprophet_2': { + 'conditions': [ + ('ts_seasonal_strength', '>=', 0.4), # Strong seasonality + ('ts_turning_points_rate', '>=', 0.3), # Many turning points + ('ts_skewness', 'abs>=', 1), # Skewed distribution + ('ts_diff1_mean', '>=', 10), # Large changes + ('ts_crossing_rate', '>=', 0.4) # Frequent crossings + ], + 'model': 'neuralprophet', + 'priority': 9 + }, + # Rule 10: High volatility with complex patterns → AutoMLX + 'automlx_1': { + 'conditions': [ + ('ts_cv', '>=', 0.6), # High variation + ('ts_nonlinearity', '>=', 1e7), # Strong nonlinearity + ('ts_spikes_rate', '>=', 0.1), # Frequent spikes + ('ts_entropy', '>=', 0.7), # Very complex + ('ts_std_residuals', '>=', 1000) # Large residuals + ], + 'model': 'automlx', + 'priority': 10 + }, + # Rule 11: Unstable patterns with regime changes → AutoMLX + 'automlx_2': { + 'conditions': [ + ('ts_trend_change', '>=', 200), # Changing trend + ('ts_turning_points_rate', '>=', 0.4), # Many turning points + ('ts_diff2_variance', '>=', 100), # Variable acceleration + ('ts_hurst', '<', -0.2), # Anti-persistent + ('ts_step_max', '>=', 1000) # Large steps + ], + 'model': 'automlx', + 'priority': 11 + }, + # Rule 12: Long series with stable seasonality → AutoTS + 'autots_1': { + 'conditions': [ + ('ts_n_obs', '>=', 150), # Long series + ('ts_seasonal_strength', '>=', 0.2), # Moderate seasonality + ('ts_cv', '<', 0.5), # Moderate variation + ('ts_entropy', '<', 0.5), # Not too complex + ('ts_acf1', '>=', 0.3) # Some autocorrelation + ], + 'model': 'autots', + 'priority': 12 + }, + # Rule 13: Stable patterns with low noise → Prophet + 'prophet_3': { + 'conditions': [ + ('ts_cv', '<', 0.3), # Low variation + ('ts_kurtosis', '<', 4), # Normal-like + ('ts_turning_points_rate', '<', 0.25), # Few turning points + ('ts_diff1_variance', '<', 50), # Stable changes + ('ts_seasonal_strength', '>=', 0.1) # Some seasonality + ], + 'model': 'prophet', + 'priority': 13 + }, + # Rule 14: Short series with strong linear patterns → ARIMA + 'arima_3': { + 'conditions': [ + ('ts_n_obs', '<', 100), # Short series + ('ts_trend', 'abs>=', 0.2), # Strong trend + ('ts_entropy', '<', 0.4), # Simple patterns + ('ts_nonlinearity', '<', 1e5), # Linear + ('ts_seasonal_strength', '<', 0.2) # Weak seasonality + ], + 'model': 'arima', + 'priority': 14 + }, + # Rule 15: Complex seasonal patterns with long memory → NeuralProphet + 'neuralprophet_3': { + 'conditions': [ + ('ts_n_obs', '>=', 300), # Very long series + ('ts_seasonal_strength', '>=', 0.3), # Clear seasonality + ('ts_acf10', '>=', 0.3), # Long memory + ('ts_hurst', '>', 0), # Persistent + ('ts_nonlinearity', '>=', 5e5) # Some nonlinearity + ], + 'model': 'neuralprophet', + 'priority': 15 + }, + # Rule 16: High complexity with non-normal distribution → AutoMLX + 'automlx_3': { + 'conditions': [ + ('ts_kurtosis', '>=', 5), # Heavy tails + ('ts_skewness', 'abs>=', 2), # Highly skewed + ('ts_entropy', '>=', 0.6), # Complex + ('ts_spikes_rate', '>=', 0.05), # Some spikes + ('ts_diff2_mean', '>=', 5) # Changing acceleration + ], + 'model': 'automlx', + 'priority': 16 + }, + # Rule 17: Simple patterns with weak seasonality → AutoTS + 'autots_2': { + 'conditions': [ + ('ts_entropy', '<', 0.3), # Simple patterns + ('ts_seasonal_strength', '<', 0.3), # Weak seasonality + ('ts_cv', '<', 0.4), # Low variation + ('ts_nonlinearity', '<', 1e5), # Nearly linear + ('ts_diff1_mean', '<', 10) # Small changes + ], + 'model': 'autots', + 'priority': 17 + } + } + + def _evaluate_condition(self, value, operator, threshold): + """Evaluate a single condition based on pre-defined operators""" + if pd.isna(value): + return False + + if operator == '>=': + return value >= threshold + elif operator == '>': + return value > threshold + elif operator == '<': + return value < threshold + elif operator == '<=': + return value <= threshold + elif operator == 'abs>=': + return abs(value) >= threshold + elif operator == 'abs<': + return abs(value) < threshold + return False + + def _check_model_conditions(self, features, model_rules): + """Check if a series meets all conditions for a model""" + for feature, operator, threshold in model_rules['conditions']: + if feature not in features: + return False + if not self._evaluate_condition(features[feature], operator, threshold): + return False + return True + + def select_best_model(self, meta_features_df): + """ + Select the best model for each series based on pre-learned rules. + + Parameters + ---------- + meta_features_df : pandas.DataFrame + DataFrame containing meta-features for each series, as returned by + build_fforms_meta_features + + Returns + ------- + pandas.DataFrame + DataFrame with series identifiers, selected model names, and matching rule info + """ + results = [] + + # Process each series + for _, row in meta_features_df.iterrows(): + series_info = {} + + # Preserve group columns if they exist + group_cols = [col for col in row.index if not col.startswith('ts_')] + for col in group_cols: + series_info[col] = row[col] + + # Find matching models + matching_models = [] + matched_features = {} + for rule_name, rules in self._meta_rules.items(): + if self._check_model_conditions(row, rules): + matching_models.append((rule_name, rules['priority'])) + # Store which features triggered this rule + matched_features[rule_name] = [ + (feature, row[feature]) + for feature, _, _ in rules['conditions'] + ] + + # Select best model based on priority + if matching_models: + best_rule = min(matching_models, key=lambda x: x[1])[0] + best_model = self._meta_rules[best_rule]['model'] + series_info['matched_features'] = matched_features[best_rule] + else: + best_rule = 'default' + best_model = 'prophet' # Default to prophet if no rules match + series_info['matched_features'] = [] + + series_info['selected_model'] = best_model + series_info['rule_matched'] = best_rule + results.append(series_info) + + return pd.DataFrame(results) + + def get_model_conditions(self): + """ + Get the pre-learned conditions for each model. + This is read-only and cannot be modified at runtime. + + Returns + ------- + dict + Dictionary containing the conditions for each model + """ + return self._meta_rules.copy() diff --git a/ads/opctl/operator/lowcode/forecast/model/base_model.py b/ads/opctl/operator/lowcode/forecast/model/base_model.py index d5b225519..b73865e43 100644 --- a/ads/opctl/operator/lowcode/forecast/model/base_model.py +++ b/ads/opctl/operator/lowcode/forecast/model/base_model.py @@ -8,6 +8,7 @@ import tempfile import time import traceback +import warnings from abc import ABC, abstractmethod from typing import Tuple @@ -98,10 +99,18 @@ def __init__(self, config: ForecastOperatorConfig, datasets: ForecastDatasets): self.spec.tuning.n_trials is not None ) - def generate_report(self): - """Generates the forecasting report.""" - import warnings + def build_model(self): + """Builds the model and returns the result DataFrame and elapsed time.""" + import time + start_time = time.time() + result_df = self._build_model() + elapsed_time = time.time() - start_time + logger.info("Building the models completed in %s seconds", elapsed_time) + return result_df, elapsed_time + + def generate_report(self, result_df=None, elapsed_time=None): + """Generates the forecasting report. Optionally accepts a precomputed result_df and elapsed_time.""" from sklearn.exceptions import ConvergenceWarning with warnings.catch_warnings(): @@ -114,10 +123,8 @@ def generate_report(self): if self.spec.previous_output_dir is not None: self._load_model() - start_time = time.time() - result_df = self._build_model() - elapsed_time = time.time() - start_time - logger.info("Building the models completed in %s seconds", elapsed_time) + if result_df is None or elapsed_time is None: + result_df, elapsed_time = self.build_model() # Generate metrics summary_metrics = None @@ -492,7 +499,10 @@ def _save_report( report.save(rc.Block(*report_sections), report_local_path) enable_print() - report_path = os.path.join(unique_output_dir, self.spec.report_filename) + report_path = self._get_unique_filename( + os.path.join(unique_output_dir, self.spec.report_filename), + storage_options + ) write_file( local_filename=report_local_path, remote_filename=report_path, @@ -511,9 +521,13 @@ def _save_report( else result_df.drop(DataColumns.Series, axis=1) ) if self.spec.generate_forecast_file: + forecast_path = self._get_unique_filename( + os.path.join(unique_output_dir, self.spec.forecast_filename), + storage_options + ) write_data( data=result_df, - filename=os.path.join(unique_output_dir, self.spec.forecast_filename), + filename=forecast_path, format="csv", storage_options=storage_options, ) @@ -531,11 +545,13 @@ def _save_report( {"index": "metrics", "Series 1": metrics_col_name}, axis=1 ) if self.spec.generate_metrics_file: + metrics_path = self._get_unique_filename( + os.path.join(unique_output_dir, self.spec.metrics_filename), + storage_options + ) write_data( data=metrics_df_formatted, - filename=os.path.join( - unique_output_dir, self.spec.metrics_filename - ), + filename=metrics_path, format="csv", storage_options=storage_options, index=False, @@ -553,11 +569,13 @@ def _save_report( {"index": "metrics", "Series 1": metrics_col_name}, axis=1 ) if self.spec.generate_metrics_file: + test_metrics_path = self._get_unique_filename( + os.path.join(unique_output_dir, self.spec.test_metrics_filename), + storage_options + ) write_data( data=test_metrics_df_formatted, - filename=os.path.join( - unique_output_dir, self.spec.test_metrics_filename - ), + filename=test_metrics_path, format="csv", storage_options=storage_options, index=False, @@ -567,6 +585,7 @@ def _save_report( logger.warning( f"Attempted to generate the {self.spec.test_metrics_filename} file with the test metrics, however the test metrics could not be properly generated." ) + # explanations csv reports if self.spec.generate_explanations: try: @@ -579,11 +598,13 @@ def _save_report( else col ) if self.spec.generate_explanation_files: + global_exp_path = self._get_unique_filename( + os.path.join(unique_output_dir, self.spec.global_explanation_filename), + storage_options + ) write_data( data=global_expl_rounded, - filename=os.path.join( - unique_output_dir, self.spec.global_explanation_filename - ), + filename=global_exp_path, format="csv", storage_options=storage_options, index=True, @@ -603,11 +624,13 @@ def _save_report( else col ) if self.spec.generate_explanation_files: + local_exp_path = self._get_unique_filename( + os.path.join(unique_output_dir, self.spec.local_explanation_filename), + storage_options + ) write_data( data=local_expl_rounded, - filename=os.path.join( - unique_output_dir, self.spec.local_explanation_filename - ), + filename=local_exp_path, format="csv", storage_options=storage_options, index=True, @@ -625,9 +648,13 @@ def _save_report( if self.spec.generate_model_parameters: # model params + model_params_path = self._get_unique_filename( + os.path.join(unique_output_dir, "model_params.json"), + storage_options + ) write_data( data=pd.DataFrame.from_dict(self.model_parameters), - filename=os.path.join(unique_output_dir, "model_params.json"), + filename=model_params_path, format="json", storage_options=storage_options, index=True, @@ -637,7 +664,16 @@ def _save_report( # model pickle if self.spec.generate_model_pickle: - self._save_model(unique_output_dir, storage_options) + pickle_path = self._get_unique_filename( + os.path.join(unique_output_dir, "model.pkl"), + storage_options + ) + write_pkl( + obj=self.models, + filename=os.path.basename(pickle_path), + output_dir=os.path.dirname(pickle_path), + storage_options=storage_options, + ) results.set_models(self.models) logger.info( @@ -648,11 +684,13 @@ def _save_report( f"The outputs have been successfully generated and placed into the directory: {unique_output_dir}." ) if self.errors_dict: + errors_path = self._get_unique_filename( + os.path.join(unique_output_dir, self.spec.errors_dict_filename), + storage_options + ) write_json( json_dict=self.errors_dict, - filename=os.path.join( - unique_output_dir, self.spec.errors_dict_filename - ), + filename=errors_path, storage_options=storage_options, ) results.set_errors_dict(self.errors_dict) @@ -873,3 +911,61 @@ def _custom_predict( return fcst return _custom_predict + + def _get_unique_filename(self, base_path: str, storage_options: dict = None) -> str: + """Generate a unique filename by appending a sequential number if file exists. + + Args: + base_path: The original file path to check + storage_options: Optional storage options for OCI paths + + Returns: + A unique file path that doesn't exist + """ + if not ObjectStorageDetails.is_oci_path(base_path): + # For local files + directory = os.path.dirname(base_path) + basename = os.path.basename(base_path) + name, ext = os.path.splitext(basename) + + model_suffix = "_" + self.spec.model + new_name = f"{name}{model_suffix}" + new_path = os.path.join(directory, f"{new_name}{ext}") + counter = 1 + while os.path.exists(new_path): + new_path = os.path.join(directory, f"{new_name}_{counter}{ext}") + counter += 1 + return new_path + else: + # For OCI paths, we need to list objects and check + try: + from oci.object_storage import ObjectStorageClient + client = ObjectStorageClient(config=storage_options) + + # Parse OCI path components + bucket_name = ObjectStorageDetails.get_bucket_name(base_path) + namespace = ObjectStorageDetails.get_namespace(base_path) + object_name = ObjectStorageDetails.get_object_name(base_path) + + name, ext = os.path.splitext(object_name) + + model_suffix = "_" + self.spec.model + new_name = f"{name}{model_suffix}" + new_object_name = f"{new_name}{ext}" + counter = 1 + while True: + try: + # Try to head the object to see if it exists + client.head_object(namespace, bucket_name, new_object_name) + # If we get here, the object exists + new_object_name = f"{new_name}_{counter}{ext}" + counter += 1 + except: + # Object doesn't exist, we can use this name + break + + # Reconstruct the full path + return ObjectStorageDetails.get_path(namespace, bucket_name, new_object_name) + except Exception as e: + logger.warning(f"Error checking OCI path existence: {e}. Using original path.") + return base_path diff --git a/ads/opctl/operator/lowcode/forecast/model/factory.py b/ads/opctl/operator/lowcode/forecast/model/factory.py index 09583c58b..566244405 100644 --- a/ads/opctl/operator/lowcode/forecast/model/factory.py +++ b/ads/opctl/operator/lowcode/forecast/model/factory.py @@ -3,7 +3,9 @@ # Copyright (c) 2023, 2024 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ -from ..const import AUTO_SELECT, SpeedAccuracyMode, SupportedModels +from ads.opctl.operator.lowcode.common.transformations import Transformations +from ..const import AUTO_SELECT, SpeedAccuracyMode, SupportedModels, AUTO_SELECT_SERIES +from ..meta_selector import MetaSelector from ..model_evaluator import ModelEvaluator from ..operator_config import ForecastOperatorConfig from .arima import ArimaOperatorModel @@ -63,7 +65,28 @@ def get_model( In case of not supported model. """ model_type = operator_config.spec.model - if model_type == AUTO_SELECT: + + if model_type == AUTO_SELECT_SERIES: + # Initialize MetaSelector for series-specific model selection + selector = MetaSelector() + # Create a Transformations instance + transformer = Transformations(dataset_info=datasets.historical_data.spec) + + # Calculate meta-features + meta_features = selector.select_best_model( + meta_features_df=transformer.build_fforms_meta_features( + data=datasets.historical_data.raw_data, + target_col=datasets.historical_data.spec.target_column, + group_cols=datasets.historical_data.spec.target_category_columns + ) + ) + # Get the most common model as default + model_type = meta_features['selected_model'].mode().iloc[0] + # Store the series-specific model selections in the config for later use + operator_config.spec.meta_features = meta_features + operator_config.spec.model_kwargs = {} + + elif model_type == AUTO_SELECT: model_type = cls.auto_select_model(datasets, operator_config) operator_config.spec.model_kwargs = {} # set the explanations accuracy mode to AUTOMLX if the selected model is automlx diff --git a/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py b/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py index 342721c83..26ddcb225 100644 --- a/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py +++ b/ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py @@ -23,8 +23,9 @@ class HistoricalData(AbstractData): - def __init__(self, spec, historical_data=None): - super().__init__(spec=spec, name="historical_data", data=historical_data) + def __init__(self, spec, historical_data=None, subset=None): + super().__init__(spec=spec, name="historical_data", data=historical_data, subset=subset) + self.subset = subset def _ingest_data(self, spec): try: @@ -52,12 +53,13 @@ def _verify_dt_col(self, spec): class AdditionalData(AbstractData): - def __init__(self, spec, historical_data, additional_data=None): + def __init__(self, spec, historical_data, additional_data=None, subset=None): + self.subset = subset if additional_data is not None: - super().__init__(spec=spec, name="additional_data", data=additional_data) + super().__init__(spec=spec, name="additional_data", data=additional_data, subset=subset) self.additional_regressors = list(self.data.columns) elif spec.additional_data is not None: - super().__init__(spec=spec, name="additional_data") + super().__init__(spec=spec, name="additional_data", subset=subset) add_dates = self.data.index.get_level_values(0).unique().tolist() add_dates.sort() if historical_data.get_max_time() > add_dates[-spec.horizon]: @@ -127,6 +129,7 @@ def __init__( historical_data=None, additional_data=None, test_data=None, + subset=None, # New parameter for subsetting by group ): """Instantiates the DataIO instance. @@ -134,26 +137,28 @@ def __init__( ---------- config: ForecastOperatorConfig The forecast operator configuration. + subset: list, optional + List of group keys to subset the data on initialization. """ + self.config = config # Store the config for later use self.historical_data: HistoricalData = None self.additional_data: AdditionalData = None self._horizon = config.spec.horizon self._datetime_column_name = config.spec.datetime_column.name self._target_col = config.spec.target_column if historical_data is not None: - self.historical_data = HistoricalData(config.spec, historical_data) + self.historical_data = HistoricalData(config.spec, historical_data, subset=subset) self.additional_data = AdditionalData( - config.spec, self.historical_data, additional_data + config.spec, self.historical_data, additional_data, subset=subset ) else: - self._load_data(config.spec) + self._load_data(config.spec, subset=subset) self.test_data = TestData(config.spec, test_data) - def _load_data(self, spec): + def _load_data(self, spec, subset=None): """Loads forecasting input data.""" - self.historical_data = HistoricalData(spec) - self.additional_data = AdditionalData(spec, self.historical_data) - + self.historical_data = HistoricalData(spec, subset=subset) + self.additional_data = AdditionalData(spec, self.historical_data, subset=subset) if spec.generate_explanations and spec.additional_data is None: logger.warning( "Unable to generate explanations as there is no additional data passed in. Either set generate_explanations to False, or pass in additional data." @@ -490,3 +495,25 @@ def set_errors_dict(self, errors_dict: Dict): def get_errors_dict(self): return getattr(self, "errors_dict", None) + + def merge(self, other: 'ForecastResults'): + """Merge another ForecastResults object into this one.""" + # Merge DataFrames if they exist, else just set + for attr in [ + 'forecast', 'metrics', 'test_metrics', 'local_explanations', 'global_explanations', 'model_parameters', 'models', 'errors_dict']: + val_self = getattr(self, attr, None) + val_other = getattr(other, attr, None) + if val_self is not None and val_other is not None: + if isinstance(val_self, pd.DataFrame) and isinstance(val_other, pd.DataFrame): + setattr(self, attr, pd.concat([val_self, val_other], ignore_index=True, axis=0)) + elif isinstance(val_self, dict) and isinstance(val_other, dict): + val_self.update(val_other) + setattr(self, attr, val_self) + elif isinstance(val_self, list) and isinstance(val_other, list): + setattr(self, attr, val_self + val_other) + else: + # If not mergeable, just keep self's value + pass + elif val_other is not None: + setattr(self, attr, val_other) + return self diff --git a/ads/opctl/operator/lowcode/forecast/schema.yaml b/ads/opctl/operator/lowcode/forecast/schema.yaml index 1b2684448..fe7c90df5 100644 --- a/ads/opctl/operator/lowcode/forecast/schema.yaml +++ b/ads/opctl/operator/lowcode/forecast/schema.yaml @@ -459,6 +459,7 @@ spec: - automlx - autots - auto-select + - auto-select-series model_kwargs: type: dict From 5097c9e4cda16751e999a0a194f5764910ebf92d Mon Sep 17 00:00:00 2001 From: Vikas Pandey Date: Fri, 20 Jun 2025 07:00:24 +0530 Subject: [PATCH 2/3] Refactor forecasting model report generation to support saving sub-reports with unique filenames --- .../operator/lowcode/forecast/__main__.py | 24 ++++-- .../lowcode/forecast/model/base_model.py | 77 ++++++++----------- 2 files changed, 50 insertions(+), 51 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/__main__.py b/ads/opctl/operator/lowcode/forecast/__main__.py index 148c9b154..57809886d 100644 --- a/ads/opctl/operator/lowcode/forecast/__main__.py +++ b/ads/opctl/operator/lowcode/forecast/__main__.py @@ -29,31 +29,39 @@ def operate(operator_config: ForecastOperatorConfig) -> ForecastResults: datasets = ForecastDatasets(operator_config) model = ForecastOperatorModelFactory.get_model(operator_config, datasets) - if operator_config.spec.model == AUTO_SELECT_SERIES and hasattr(operator_config.spec, 'meta_features'): + if operator_config.spec.model == AUTO_SELECT_SERIES and hasattr( + operator_config.spec, "meta_features" + ): # For AUTO_SELECT_SERIES, handle each series with its specific model meta_features = operator_config.spec.meta_features results = ForecastResults() - results_df = pd.DataFrame() - elapsed_time = 0 - sub_model_list = [] sub_results_list = [] # Group the data by selected model - for model_name in meta_features['selected_model'].unique(): + for model_name in meta_features["selected_model"].unique(): # Get series that use this model - series_groups = meta_features[meta_features['selected_model'] == model_name] + series_groups = meta_features[meta_features["selected_model"] == model_name] # Create a sub-config for this model sub_config = copy.deepcopy(operator_config) sub_config.spec.model = model_name # Create sub-datasets for these series - sub_datasets = ForecastDatasets(operator_config, subset=series_groups[operator_config.spec.target_category_columns].values.flatten().tolist()) + sub_datasets = ForecastDatasets( + operator_config, + subset=series_groups[operator_config.spec.target_category_columns] + .values.flatten() + .tolist(), + ) # Get and run the appropriate model sub_model = ForecastOperatorModelFactory.get_model(sub_config, sub_datasets) sub_result_df, sub_elapsed_time = sub_model.build_model() - sub_results = sub_model.generate_report(result_df=sub_result_df, elapsed_time=sub_elapsed_time) + sub_results = sub_model.generate_report( + result_df=sub_result_df, + elapsed_time=sub_elapsed_time, + save_sub_reports=True, + ) sub_results_list.append(sub_results) # results_df = pd.concat([results_df, sub_result_df], ignore_index=True, axis=0) diff --git a/ads/opctl/operator/lowcode/forecast/model/base_model.py b/ads/opctl/operator/lowcode/forecast/model/base_model.py index b73865e43..a57299e1a 100644 --- a/ads/opctl/operator/lowcode/forecast/model/base_model.py +++ b/ads/opctl/operator/lowcode/forecast/model/base_model.py @@ -109,8 +109,11 @@ def build_model(self): logger.info("Building the models completed in %s seconds", elapsed_time) return result_df, elapsed_time - def generate_report(self, result_df=None, elapsed_time=None): - """Generates the forecasting report. Optionally accepts a precomputed result_df and elapsed_time.""" + def generate_report( + self, result_df=None, elapsed_time=None, save_sub_reports=False + ): + """Generates the forecasting report. Optionally accepts a precomputed result_df and elapsed_time. + If save_sub_reports is True, unique filenames are generated for all outputs.""" from sklearn.exceptions import ConvergenceWarning with warnings.catch_warnings(): @@ -361,6 +364,7 @@ def generate_report(self, result_df=None, elapsed_time=None): metrics_df=self.eval_metrics, test_metrics_df=self.test_eval_metrics, test_data=test_data, + save_sub_reports=save_sub_reports, ) def _test_evaluate_metrics(self, elapsed_time=0): @@ -478,8 +482,9 @@ def _save_report( metrics_df: pd.DataFrame, test_metrics_df: pd.DataFrame, test_data: pd.DataFrame, + save_sub_reports: bool = False, ): - """Saves resulting reports to the given folder.""" + """Saves resulting reports to the given folder. If save_sub_reports is True, use unique filenames.""" unique_output_dir = self.spec.output_directory.url results = ForecastResults() @@ -490,6 +495,12 @@ def _save_report( else {} ) + def get_path(filename): + path = os.path.join(unique_output_dir, filename) + if save_sub_reports: + return self._get_unique_filename(path, storage_options) + return path + # report-creator html report if self.spec.generate_report: with tempfile.TemporaryDirectory() as temp_dir: @@ -499,10 +510,7 @@ def _save_report( report.save(rc.Block(*report_sections), report_local_path) enable_print() - report_path = self._get_unique_filename( - os.path.join(unique_output_dir, self.spec.report_filename), - storage_options - ) + report_path = get_path(self.spec.report_filename) write_file( local_filename=report_local_path, remote_filename=report_path, @@ -521,10 +529,7 @@ def _save_report( else result_df.drop(DataColumns.Series, axis=1) ) if self.spec.generate_forecast_file: - forecast_path = self._get_unique_filename( - os.path.join(unique_output_dir, self.spec.forecast_filename), - storage_options - ) + forecast_path = get_path(self.spec.forecast_filename) write_data( data=result_df, filename=forecast_path, @@ -545,10 +550,7 @@ def _save_report( {"index": "metrics", "Series 1": metrics_col_name}, axis=1 ) if self.spec.generate_metrics_file: - metrics_path = self._get_unique_filename( - os.path.join(unique_output_dir, self.spec.metrics_filename), - storage_options - ) + metrics_path = get_path(self.spec.metrics_filename) write_data( data=metrics_df_formatted, filename=metrics_path, @@ -569,10 +571,7 @@ def _save_report( {"index": "metrics", "Series 1": metrics_col_name}, axis=1 ) if self.spec.generate_metrics_file: - test_metrics_path = self._get_unique_filename( - os.path.join(unique_output_dir, self.spec.test_metrics_filename), - storage_options - ) + test_metrics_path = get_path(self.spec.test_metrics_filename) write_data( data=test_metrics_df_formatted, filename=test_metrics_path, @@ -598,9 +597,8 @@ def _save_report( else col ) if self.spec.generate_explanation_files: - global_exp_path = self._get_unique_filename( - os.path.join(unique_output_dir, self.spec.global_explanation_filename), - storage_options + global_exp_path = get_path( + self.spec.global_explanation_filename ) write_data( data=global_expl_rounded, @@ -624,10 +622,7 @@ def _save_report( else col ) if self.spec.generate_explanation_files: - local_exp_path = self._get_unique_filename( - os.path.join(unique_output_dir, self.spec.local_explanation_filename), - storage_options - ) + local_exp_path = get_path(self.spec.local_explanation_filename) write_data( data=local_expl_rounded, filename=local_exp_path, @@ -648,10 +643,7 @@ def _save_report( if self.spec.generate_model_parameters: # model params - model_params_path = self._get_unique_filename( - os.path.join(unique_output_dir, "model_params.json"), - storage_options - ) + model_params_path = get_path("model_params.json") write_data( data=pd.DataFrame.from_dict(self.model_parameters), filename=model_params_path, @@ -664,10 +656,7 @@ def _save_report( # model pickle if self.spec.generate_model_pickle: - pickle_path = self._get_unique_filename( - os.path.join(unique_output_dir, "model.pkl"), - storage_options - ) + pickle_path = get_path("model.pkl") write_pkl( obj=self.models, filename=os.path.basename(pickle_path), @@ -684,10 +673,7 @@ def _save_report( f"The outputs have been successfully generated and placed into the directory: {unique_output_dir}." ) if self.errors_dict: - errors_path = self._get_unique_filename( - os.path.join(unique_output_dir, self.spec.errors_dict_filename), - storage_options - ) + errors_path = get_path(self.spec.errors_dict_filename) write_json( json_dict=self.errors_dict, filename=errors_path, @@ -887,9 +873,9 @@ def local_explainer(self, kernel_explainer, series_id, datetime_col_name) -> Non # Add date column to local explanation DataFrame local_kernel_explnr_df[ForecastOutputColumns.DATE] = ( - self.datasets.get_horizon_at_series( - s_id=series_id - )[self.spec.datetime_column.name].reset_index(drop=True) + self.datasets.get_horizon_at_series(s_id=series_id)[ + self.spec.datetime_column.name + ].reset_index(drop=True) ) self.local_explanation[series_id] = local_kernel_explnr_df @@ -940,6 +926,7 @@ def _get_unique_filename(self, base_path: str, storage_options: dict = None) -> # For OCI paths, we need to list objects and check try: from oci.object_storage import ObjectStorageClient + client = ObjectStorageClient(config=storage_options) # Parse OCI path components @@ -965,7 +952,11 @@ def _get_unique_filename(self, base_path: str, storage_options: dict = None) -> break # Reconstruct the full path - return ObjectStorageDetails.get_path(namespace, bucket_name, new_object_name) + return ObjectStorageDetails.get_path( + namespace, bucket_name, new_object_name + ) except Exception as e: - logger.warning(f"Error checking OCI path existence: {e}. Using original path.") + logger.warning( + f"Error checking OCI path existence: {e}. Using original path." + ) return base_path From 4a2e7106980dbaee5c7dc9efc321dbc12d58c9cc Mon Sep 17 00:00:00 2001 From: Vikas Pandey Date: Fri, 20 Jun 2025 07:20:24 +0530 Subject: [PATCH 3/3] Remove used import and format using ruff --- .../lowcode/forecast/meta_selector.py | 352 +++++++++--------- 1 file changed, 175 insertions(+), 177 deletions(-) diff --git a/ads/opctl/operator/lowcode/forecast/meta_selector.py b/ads/opctl/operator/lowcode/forecast/meta_selector.py index 18c243bad..76390b279 100644 --- a/ads/opctl/operator/lowcode/forecast/meta_selector.py +++ b/ads/opctl/operator/lowcode/forecast/meta_selector.py @@ -3,9 +3,8 @@ # Copyright (c) 2023, 2025 Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/ -import pandas as pd import numpy as np -from ..common.transformations import Transformations +import pandas as pd class MetaSelector: @@ -19,226 +18,226 @@ def __init__(self): # Pre-learned rules based on meta-features self._meta_rules = { # Rule 1: Strong trend, weak seasonality → ARIMA - 'arima_0': { - 'conditions': [ - ('ts_trend', 'abs>=', 0.65), # Strong trend strength - ('ts_seasonal_strength', '<', 0.20), # Weak seasonality + "arima_0": { + "conditions": [ + ("ts_trend", "abs>=", 0.65), # Strong trend strength + ("ts_seasonal_strength", "<", 0.20), # Weak seasonality ], - 'model': 'arima', - 'priority': 1 + "model": "arima", + "priority": 1, }, # Rule 2: Strong seasonality, long series → Prophet - 'prophet_0': { - 'conditions': [ - ('ts_seasonal_strength', '>=', 0.50), # Strong seasonality - ('ts_n_obs', '>=', 200), # Long series + "prophet_0": { + "conditions": [ + ("ts_seasonal_strength", ">=", 0.50), # Strong seasonality + ("ts_n_obs", ">=", 200), # Long series ], - 'model': 'prophet', - 'priority': 2 + "model": "prophet", + "priority": 2, }, # Rule 3: High entropy, low autocorrelation → AutoMLX - 'automlx_0': { - 'conditions': [ - ('ts_entropy', '>=', 4.0), # High entropy - ('ts_acf1', '<=', 0.30), # Low autocorrelation + "automlx_0": { + "conditions": [ + ("ts_entropy", ">=", 4.0), # High entropy + ("ts_acf1", "<=", 0.30), # Low autocorrelation ], - 'model': 'automlx', - 'priority': 3 + "model": "automlx", + "priority": 3, }, # Rule 4: Strong seasonality with trend and changing patterns → Prophet - 'prophet_1': { - 'conditions': [ - ('ts_seasonal_strength', '>=', 0.3), # Strong seasonality - ('ts_trend', 'abs>=', 0.1), # Clear trend - ('ts_turning_points_rate', '>=', 0.2), # Multiple change points - ('ts_n_obs', '>=', 50), # Sufficient data - ('ts_step_max', '>=', 100), # Significant steps - ('ts_diff1_variance', '>=', 10) # Variable differences + "prophet_1": { + "conditions": [ + ("ts_seasonal_strength", ">=", 0.3), # Strong seasonality + ("ts_trend", "abs>=", 0.1), # Clear trend + ("ts_turning_points_rate", ">=", 0.2), # Multiple change points + ("ts_n_obs", ">=", 50), # Sufficient data + ("ts_step_max", ">=", 100), # Significant steps + ("ts_diff1_variance", ">=", 10), # Variable differences ], - 'model': 'prophet', - 'priority': 4 + "model": "prophet", + "priority": 4, }, # Rule 5: Multiple seasonality with nonlinear patterns → Prophet - 'prophet_2': { - 'conditions': [ - ('ts_seasonal_peak_strength', '>=', 0.4), # Strong peak seasonality - ('ts_seasonal_strength', '>=', 0.2), # Overall seasonality - ('ts_acf10', '>=', 0.2), # Long-term correlation - ('ts_entropy', '>=', 0.5), # Complex patterns - ('ts_crossing_rate', '>=', 0.3) # Frequent mean crossings + "prophet_2": { + "conditions": [ + ("ts_seasonal_peak_strength", ">=", 0.4), # Strong peak seasonality + ("ts_seasonal_strength", ">=", 0.2), # Overall seasonality + ("ts_acf10", ">=", 0.2), # Long-term correlation + ("ts_entropy", ">=", 0.5), # Complex patterns + ("ts_crossing_rate", ">=", 0.3), # Frequent mean crossings ], - 'model': 'prophet', - 'priority': 5 + "model": "prophet", + "priority": 5, }, # Rule 6: Strong autocorrelation with stationary behavior → ARIMA - 'arima_1': { - 'conditions': [ - ('ts_acf1', '>=', 0.7), # Strong lag-1 correlation - ('ts_acf2', '>=', 0.5), # Strong lag-2 correlation - ('ts_seasonal_strength', '<', 0.3), # Weak seasonality - ('ts_std_residuals', '<', 500), # Stable residuals - ('ts_diff1_variance', '<', 100), # Stable first differences - ('ts_hurst', '>', -0.1) # Some persistence + "arima_1": { + "conditions": [ + ("ts_acf1", ">=", 0.7), # Strong lag-1 correlation + ("ts_acf2", ">=", 0.5), # Strong lag-2 correlation + ("ts_seasonal_strength", "<", 0.3), # Weak seasonality + ("ts_std_residuals", "<", 500), # Stable residuals + ("ts_diff1_variance", "<", 100), # Stable first differences + ("ts_hurst", ">", -0.1), # Some persistence ], - 'model': 'arima', - 'priority': 6 + "model": "arima", + "priority": 6, }, # Rule 7: Linear trend with moderate noise → ARIMA - 'arima_2': { - 'conditions': [ - ('ts_trend', 'abs>=', 0.15), # Clear trend - ('ts_trend_change', '<', 100), # Stable trend - ('ts_cv', '<', 0.4), # Low variation - ('ts_kurtosis', '<', 5), # Normal-like distribution - ('ts_nonlinearity', '<', 1e5) # Linear relationships + "arima_2": { + "conditions": [ + ("ts_trend", "abs>=", 0.15), # Clear trend + ("ts_trend_change", "<", 100), # Stable trend + ("ts_cv", "<", 0.4), # Low variation + ("ts_kurtosis", "<", 5), # Normal-like distribution + ("ts_nonlinearity", "<", 1e5), # Linear relationships ], - 'model': 'arima', - 'priority': 7 + "model": "arima", + "priority": 7, }, # Rule 8: Complex seasonality with high nonlinearity → NeuralProphet - 'neuralprophet_1': { - 'conditions': [ - ('ts_seasonal_peak_strength', '>=', 0.5), # Strong seasonal peaks - ('ts_nonlinearity', '>=', 1e6), # Nonlinear patterns - ('ts_n_obs', '>=', 200), # Long series - ('ts_entropy', '>=', 0.6), # Complex patterns - ('ts_diff2_variance', '>=', 50) # Variable acceleration + "neuralprophet_1": { + "conditions": [ + ("ts_seasonal_peak_strength", ">=", 0.5), # Strong seasonal peaks + ("ts_nonlinearity", ">=", 1e6), # Nonlinear patterns + ("ts_n_obs", ">=", 200), # Long series + ("ts_entropy", ">=", 0.6), # Complex patterns + ("ts_diff2_variance", ">=", 50), # Variable acceleration ], - 'model': 'neuralprophet', - 'priority': 8 + "model": "neuralprophet", + "priority": 8, }, # Rule 9: Multiple seasonal patterns with changing behavior → NeuralProphet - 'neuralprophet_2': { - 'conditions': [ - ('ts_seasonal_strength', '>=', 0.4), # Strong seasonality - ('ts_turning_points_rate', '>=', 0.3), # Many turning points - ('ts_skewness', 'abs>=', 1), # Skewed distribution - ('ts_diff1_mean', '>=', 10), # Large changes - ('ts_crossing_rate', '>=', 0.4) # Frequent crossings + "neuralprophet_2": { + "conditions": [ + ("ts_seasonal_strength", ">=", 0.4), # Strong seasonality + ("ts_turning_points_rate", ">=", 0.3), # Many turning points + ("ts_skewness", "abs>=", 1), # Skewed distribution + ("ts_diff1_mean", ">=", 10), # Large changes + ("ts_crossing_rate", ">=", 0.4), # Frequent crossings ], - 'model': 'neuralprophet', - 'priority': 9 + "model": "neuralprophet", + "priority": 9, }, # Rule 10: High volatility with complex patterns → AutoMLX - 'automlx_1': { - 'conditions': [ - ('ts_cv', '>=', 0.6), # High variation - ('ts_nonlinearity', '>=', 1e7), # Strong nonlinearity - ('ts_spikes_rate', '>=', 0.1), # Frequent spikes - ('ts_entropy', '>=', 0.7), # Very complex - ('ts_std_residuals', '>=', 1000) # Large residuals + "automlx_1": { + "conditions": [ + ("ts_cv", ">=", 0.6), # High variation + ("ts_nonlinearity", ">=", 1e7), # Strong nonlinearity + ("ts_spikes_rate", ">=", 0.1), # Frequent spikes + ("ts_entropy", ">=", 0.7), # Very complex + ("ts_std_residuals", ">=", 1000), # Large residuals ], - 'model': 'automlx', - 'priority': 10 + "model": "automlx", + "priority": 10, }, # Rule 11: Unstable patterns with regime changes → AutoMLX - 'automlx_2': { - 'conditions': [ - ('ts_trend_change', '>=', 200), # Changing trend - ('ts_turning_points_rate', '>=', 0.4), # Many turning points - ('ts_diff2_variance', '>=', 100), # Variable acceleration - ('ts_hurst', '<', -0.2), # Anti-persistent - ('ts_step_max', '>=', 1000) # Large steps + "automlx_2": { + "conditions": [ + ("ts_trend_change", ">=", 200), # Changing trend + ("ts_turning_points_rate", ">=", 0.4), # Many turning points + ("ts_diff2_variance", ">=", 100), # Variable acceleration + ("ts_hurst", "<", -0.2), # Anti-persistent + ("ts_step_max", ">=", 1000), # Large steps ], - 'model': 'automlx', - 'priority': 11 + "model": "automlx", + "priority": 11, }, # Rule 12: Long series with stable seasonality → AutoTS - 'autots_1': { - 'conditions': [ - ('ts_n_obs', '>=', 150), # Long series - ('ts_seasonal_strength', '>=', 0.2), # Moderate seasonality - ('ts_cv', '<', 0.5), # Moderate variation - ('ts_entropy', '<', 0.5), # Not too complex - ('ts_acf1', '>=', 0.3) # Some autocorrelation + "autots_1": { + "conditions": [ + ("ts_n_obs", ">=", 150), # Long series + ("ts_seasonal_strength", ">=", 0.2), # Moderate seasonality + ("ts_cv", "<", 0.5), # Moderate variation + ("ts_entropy", "<", 0.5), # Not too complex + ("ts_acf1", ">=", 0.3), # Some autocorrelation ], - 'model': 'autots', - 'priority': 12 + "model": "autots", + "priority": 12, }, # Rule 13: Stable patterns with low noise → Prophet - 'prophet_3': { - 'conditions': [ - ('ts_cv', '<', 0.3), # Low variation - ('ts_kurtosis', '<', 4), # Normal-like - ('ts_turning_points_rate', '<', 0.25), # Few turning points - ('ts_diff1_variance', '<', 50), # Stable changes - ('ts_seasonal_strength', '>=', 0.1) # Some seasonality + "prophet_3": { + "conditions": [ + ("ts_cv", "<", 0.3), # Low variation + ("ts_kurtosis", "<", 4), # Normal-like + ("ts_turning_points_rate", "<", 0.25), # Few turning points + ("ts_diff1_variance", "<", 50), # Stable changes + ("ts_seasonal_strength", ">=", 0.1), # Some seasonality ], - 'model': 'prophet', - 'priority': 13 + "model": "prophet", + "priority": 13, }, # Rule 14: Short series with strong linear patterns → ARIMA - 'arima_3': { - 'conditions': [ - ('ts_n_obs', '<', 100), # Short series - ('ts_trend', 'abs>=', 0.2), # Strong trend - ('ts_entropy', '<', 0.4), # Simple patterns - ('ts_nonlinearity', '<', 1e5), # Linear - ('ts_seasonal_strength', '<', 0.2) # Weak seasonality + "arima_3": { + "conditions": [ + ("ts_n_obs", "<", 100), # Short series + ("ts_trend", "abs>=", 0.2), # Strong trend + ("ts_entropy", "<", 0.4), # Simple patterns + ("ts_nonlinearity", "<", 1e5), # Linear + ("ts_seasonal_strength", "<", 0.2), # Weak seasonality ], - 'model': 'arima', - 'priority': 14 + "model": "arima", + "priority": 14, }, # Rule 15: Complex seasonal patterns with long memory → NeuralProphet - 'neuralprophet_3': { - 'conditions': [ - ('ts_n_obs', '>=', 300), # Very long series - ('ts_seasonal_strength', '>=', 0.3), # Clear seasonality - ('ts_acf10', '>=', 0.3), # Long memory - ('ts_hurst', '>', 0), # Persistent - ('ts_nonlinearity', '>=', 5e5) # Some nonlinearity + "neuralprophet_3": { + "conditions": [ + ("ts_n_obs", ">=", 300), # Very long series + ("ts_seasonal_strength", ">=", 0.3), # Clear seasonality + ("ts_acf10", ">=", 0.3), # Long memory + ("ts_hurst", ">", 0), # Persistent + ("ts_nonlinearity", ">=", 5e5), # Some nonlinearity ], - 'model': 'neuralprophet', - 'priority': 15 + "model": "neuralprophet", + "priority": 15, }, # Rule 16: High complexity with non-normal distribution → AutoMLX - 'automlx_3': { - 'conditions': [ - ('ts_kurtosis', '>=', 5), # Heavy tails - ('ts_skewness', 'abs>=', 2), # Highly skewed - ('ts_entropy', '>=', 0.6), # Complex - ('ts_spikes_rate', '>=', 0.05), # Some spikes - ('ts_diff2_mean', '>=', 5) # Changing acceleration + "automlx_3": { + "conditions": [ + ("ts_kurtosis", ">=", 5), # Heavy tails + ("ts_skewness", "abs>=", 2), # Highly skewed + ("ts_entropy", ">=", 0.6), # Complex + ("ts_spikes_rate", ">=", 0.05), # Some spikes + ("ts_diff2_mean", ">=", 5), # Changing acceleration ], - 'model': 'automlx', - 'priority': 16 + "model": "automlx", + "priority": 16, }, # Rule 17: Simple patterns with weak seasonality → AutoTS - 'autots_2': { - 'conditions': [ - ('ts_entropy', '<', 0.3), # Simple patterns - ('ts_seasonal_strength', '<', 0.3), # Weak seasonality - ('ts_cv', '<', 0.4), # Low variation - ('ts_nonlinearity', '<', 1e5), # Nearly linear - ('ts_diff1_mean', '<', 10) # Small changes + "autots_2": { + "conditions": [ + ("ts_entropy", "<", 0.3), # Simple patterns + ("ts_seasonal_strength", "<", 0.3), # Weak seasonality + ("ts_cv", "<", 0.4), # Low variation + ("ts_nonlinearity", "<", 1e5), # Nearly linear + ("ts_diff1_mean", "<", 10), # Small changes ], - 'model': 'autots', - 'priority': 17 - } + "model": "autots", + "priority": 17, + }, } def _evaluate_condition(self, value, operator, threshold): """Evaluate a single condition based on pre-defined operators""" if pd.isna(value): return False - - if operator == '>=': + + if operator == ">=": return value >= threshold - elif operator == '>': + elif operator == ">": return value > threshold - elif operator == '<': + elif operator == "<": return value < threshold - elif operator == '<=': + elif operator == "<=": return value <= threshold - elif operator == 'abs>=': + elif operator == "abs>=": return abs(value) >= threshold - elif operator == 'abs<': + elif operator == "abs<": return abs(value) < threshold return False def _check_model_conditions(self, features, model_rules): """Check if a series meets all conditions for a model""" - for feature, operator, threshold in model_rules['conditions']: + for feature, operator, threshold in model_rules["conditions"]: if feature not in features: return False if not self._evaluate_condition(features[feature], operator, threshold): @@ -248,62 +247,61 @@ def _check_model_conditions(self, features, model_rules): def select_best_model(self, meta_features_df): """ Select the best model for each series based on pre-learned rules. - + Parameters ---------- meta_features_df : pandas.DataFrame DataFrame containing meta-features for each series, as returned by build_fforms_meta_features - + Returns ------- pandas.DataFrame DataFrame with series identifiers, selected model names, and matching rule info """ results = [] - + # Process each series for _, row in meta_features_df.iterrows(): series_info = {} - + # Preserve group columns if they exist - group_cols = [col for col in row.index if not col.startswith('ts_')] + group_cols = [col for col in row.index if not col.startswith("ts_")] for col in group_cols: series_info[col] = row[col] - + # Find matching models matching_models = [] matched_features = {} for rule_name, rules in self._meta_rules.items(): if self._check_model_conditions(row, rules): - matching_models.append((rule_name, rules['priority'])) + matching_models.append((rule_name, rules["priority"])) # Store which features triggered this rule matched_features[rule_name] = [ - (feature, row[feature]) - for feature, _, _ in rules['conditions'] + (feature, row[feature]) for feature, _, _ in rules["conditions"] ] - + # Select best model based on priority if matching_models: best_rule = min(matching_models, key=lambda x: x[1])[0] - best_model = self._meta_rules[best_rule]['model'] - series_info['matched_features'] = matched_features[best_rule] + best_model = self._meta_rules[best_rule]["model"] + series_info["matched_features"] = matched_features[best_rule] else: - best_rule = 'default' - best_model = 'prophet' # Default to prophet if no rules match - series_info['matched_features'] = [] - - series_info['selected_model'] = best_model - series_info['rule_matched'] = best_rule + best_rule = "default" + best_model = "prophet" # Default to prophet if no rules match + series_info["matched_features"] = [] + + series_info["selected_model"] = best_model + series_info["rule_matched"] = best_rule results.append(series_info) - + return pd.DataFrame(results) def get_model_conditions(self): """ Get the pre-learned conditions for each model. This is read-only and cannot be modified at runtime. - + Returns ------- dict