diff --git a/runtime/databricks/automl_runtime/forecast/deepar/model.py b/runtime/databricks/automl_runtime/forecast/deepar/model.py index 9263640..137c37a 100644 --- a/runtime/databricks/automl_runtime/forecast/deepar/model.py +++ b/runtime/databricks/automl_runtime/forecast/deepar/model.py @@ -42,7 +42,7 @@ class DeepARModel(ForecastModel): DeepAR mlflow model wrapper for forecasting. """ - def __init__(self, model: PyTorchPredictor, horizon: int, frequency: str, + def __init__(self, model: PyTorchPredictor, horizon: int, frequency_unit: str, frequency_quantity: int, num_samples: int, target_col: str, time_col: str, id_cols: Optional[List[str]] = None) -> None: @@ -50,7 +50,8 @@ def __init__(self, model: PyTorchPredictor, horizon: int, frequency: str, Initialize the DeepAR mlflow Python model wrapper :param model: DeepAR model :param horizon: the number of periods to forecast forward - :param frequency: the frequency of the time series + :param frequency_unit: the frequency unit of the time series + :param frequency_quantity: the frequency quantity of the time series :param num_samples: the number of samples to draw from the distribution :param target_col: the target column name :param time_col: the time column name @@ -60,7 +61,8 @@ def __init__(self, model: PyTorchPredictor, horizon: int, frequency: str, super().__init__() self._model = model self._horizon = horizon - self._frequency = frequency + self._frequency_unit = frequency_unit + self._frequency_quantity = frequency_quantity self._num_samples = num_samples self._target_col = target_col self._time_col = time_col @@ -128,7 +130,8 @@ def predict_samples(self, model_input_transformed = set_index_and_fill_missing_time_steps(model_input, self._time_col, - self._frequency, + self._frequency_unit, + self._frequency_quantity, self._id_cols) test_ds = PandasDataset(model_input_transformed, target=self._target_col) diff --git a/runtime/databricks/automl_runtime/forecast/deepar/utils.py b/runtime/databricks/automl_runtime/forecast/deepar/utils.py index e49b5d0..016de93 100644 --- a/runtime/databricks/automl_runtime/forecast/deepar/utils.py +++ b/runtime/databricks/automl_runtime/forecast/deepar/utils.py @@ -18,7 +18,10 @@ import pandas as pd -def validate_and_generate_index(df: pd.DataFrame, time_col: str, frequency: str): +def validate_and_generate_index(df: pd.DataFrame, + time_col: str, + frequency_unit: str, + frequency_quantity: int): """ Generate a complete time index for the given DataFrame based on the specified frequency. - Ensures the time column is in datetime format. @@ -26,12 +29,13 @@ def validate_and_generate_index(df: pd.DataFrame, time_col: str, frequency: str) - Generates a new time index from the minimum to the maximum timestamp in the data. :param df: The input DataFrame containing the time column. :param time_col: The name of the time column. - :param frequency: The frequency of the time series. + :param frequency_unit: The frequency unit of the time series. + :param frequency_quantity: The frequency quantity of the time series. :return: A complete time index covering the full range of the dataset. :raises ValueError: If the day-of-month pattern is inconsistent for "MS" frequency. """ - if frequency.upper() != "MS": - return pd.date_range(df[time_col].min(), df[time_col].max(), freq=frequency) + if frequency_unit.upper() != "MS": + return pd.date_range(df[time_col].min(), df[time_col].max(), freq=f"{frequency_quantity}{frequency_unit}") df[time_col] = pd.to_datetime(df[time_col]) # Ensure datetime format @@ -63,7 +67,8 @@ def validate_and_generate_index(df: pd.DataFrame, time_col: str, frequency: str) return new_index_full def set_index_and_fill_missing_time_steps(df: pd.DataFrame, time_col: str, - frequency: str, + frequency_unit: str, + frequency_quantity: int, id_cols: Optional[List[str]] = None): """ Transform the input dataframe to an acceptable format for the GluonTS library. @@ -73,20 +78,21 @@ def set_index_and_fill_missing_time_steps(df: pd.DataFrame, time_col: str, :param df: the input dataframe that contains time_col :param time_col: time column name - :param frequency: the frequency of the time series + :param frequency_unit: the frequency unit of the time series + :param frequency_quantity: the frequency quantity of the time series :param id_cols: the column names of the identity columns for multi-series time series; None for single series :return: single-series - transformed dataframe; multi-series - dictionary of transformed dataframes, each key is the (concatenated) id of the time series """ total_min, total_max = df[time_col].min(), df[time_col].max() - # We need to adjust the frequency for pd.date_range if it is weekly, + # We need to adjust the frequency_unit for pd.date_range if it is weekly, # otherwise it would always be "W-SUN" - if frequency.upper() == "W": + if frequency_unit.upper() == "W": weekday_name = total_min.strftime("%a").upper() # e.g., "FRI" - frequency = f"W-{weekday_name}" + frequency_unit = f"W-{weekday_name}" - new_index_full = validate_and_generate_index(df=df, time_col=time_col, frequency=frequency) + valid_index = validate_and_generate_index(df=df, time_col=time_col, frequency_unit=frequency_unit, frequency_quantity=frequency_quantity) if id_cols is not None: df_dict = {} @@ -96,16 +102,16 @@ def set_index_and_fill_missing_time_steps(df: pd.DataFrame, time_col: str, else: ts_id = str(grouped_id) df_dict[ts_id] = (grouped_df.set_index(time_col).sort_index() - .reindex(new_index_full).drop(id_cols, axis=1)) + .reindex(valid_index).drop(id_cols, axis=1)) return df_dict df = df.set_index(time_col).sort_index() # Fill in missing time steps between the min and max time steps - df = df.reindex(new_index_full) + df = df.reindex(valid_index) - if frequency.upper() == "MS": + if frequency_unit.upper() == "MS": # Truncate the day of month to avoid issues with pandas frequency check df = df.to_period("M") diff --git a/runtime/databricks/automl_runtime/forecast/pmdarima/model.py b/runtime/databricks/automl_runtime/forecast/pmdarima/model.py index 64e5377..cc4116c 100644 --- a/runtime/databricks/automl_runtime/forecast/pmdarima/model.py +++ b/runtime/databricks/automl_runtime/forecast/pmdarima/model.py @@ -64,18 +64,19 @@ def model_env(self): return ARIMA_CONDA_ENV @staticmethod - def _get_ds_indices(start_ds: pd.Timestamp, periods: int, frequency: str) -> pd.DatetimeIndex: + def _get_ds_indices(start_ds: pd.Timestamp, periods: int, frequency_unit: str, frequency_quantity: int) -> pd.DatetimeIndex: """ Create a DatetimeIndex with specified starting time and frequency, whose length is the given periods. :param start_ds: the pd.Timestamp as the start of the DatetimeIndex. :param periods: the length of the DatetimeIndex. - :param frequency: the frequency of the DatetimeIndex. + :param frequency_unit: the frequency unit of the DatetimeIndex. + :param frequency_quantity: the frequency quantity of the DatetimeIndex. :return: a DatetimeIndex. """ ds_indices = pd.date_range( start=start_ds, periods=periods, - freq=pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[frequency]) + freq=pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[frequency_unit]) * frequency_quantity ) modified_start_ds = ds_indices.min() if start_ds != modified_start_ds: @@ -89,14 +90,15 @@ class ArimaModel(AbstractArimaModel): ARIMA mlflow model wrapper for univariate forecasting. """ - def __init__(self, pickled_model: bytes, horizon: int, frequency: str, - start_ds: pd.Timestamp, end_ds: pd.Timestamp, + def __init__(self, pickled_model: bytes, horizon: int, frequency_unit: str, + frequency_quantity: int, start_ds: pd.Timestamp, end_ds: pd.Timestamp, time_col: str, exogenous_cols: Optional[List[str]] = None) -> None: """ Initialize the mlflow Python model wrapper for ARIMA. :param pickled_model: the pickled ARIMA model as a bytes object. :param horizon: int number of periods to forecast forward. - :param frequency: the frequency of the time series + :param frequency_unit: the frequency unit of the time series + :param frequency_quantity: the frequency quantity of the time series :param start_ds: the start time of training data :param end_ds: the end time of training data :param time_col: the column name of the time column @@ -106,7 +108,8 @@ def __init__(self, pickled_model: bytes, horizon: int, frequency: str, super().__init__() self._pickled_model = pickled_model self._horizon = horizon - self._frequency = OFFSET_ALIAS_MAP[frequency] + self._frequency_unit = OFFSET_ALIAS_MAP[frequency_unit] + self._frequency_quantity = frequency_quantity self._start_ds = pd.to_datetime(start_ds) self._end_ds = pd.to_datetime(end_ds) self._time_col = time_col @@ -157,7 +160,8 @@ def make_future_dataframe(self, horizon: int = None, include_history: bool = Tru start_time=self._start_ds, end_time=self._end_ds, horizon=horizon or self._horizon, - frequency=self._frequency, + frequency_unit=self._frequency_unit, + frequency_quantity=self._frequency_quantity, include_history=include_history ) @@ -192,7 +196,7 @@ def _predict_impl(self, input_df: pd.DataFrame) -> pd.DataFrame: ) # Check if the time has correct frequency consistency = df["ds"].apply(lambda x: - is_frequency_consistency(self._start_ds, x, self._frequency) + is_frequency_consistency(self._start_ds, x, self._frequency_unit, self._frequency_quantity) ).all() if not consistency: raise MlflowException( @@ -203,7 +207,7 @@ def _predict_impl(self, input_df: pd.DataFrame) -> pd.DataFrame: ) preds_pds = [] # Out-of-sample prediction if needed - horizon = calculate_period_differences(self._end_ds, max(df["ds"]), self._frequency) + horizon = calculate_period_differences(self._end_ds, max(df["ds"]), self._frequency_unit, self._frequency_quantity) if horizon > 0: X_future = df[df["ds"] > self._end_ds].set_index("ds") future_pd = self._forecast( @@ -229,8 +233,8 @@ def _predict_in_sample( end_ds: pd.Timestamp = None, X: pd.DataFrame = None) -> pd.DataFrame: if start_ds and end_ds: - start_idx = calculate_period_differences(self._start_ds, start_ds, self._frequency) - end_idx = calculate_period_differences(self._start_ds, end_ds, self._frequency) + start_idx = calculate_period_differences(self._start_ds, start_ds, self._frequency_unit, self._frequency_quantity) + end_idx = calculate_period_differences(self._start_ds, end_ds, self._frequency_unit, self._frequency_quantity) else: start_ds = self._start_ds end_ds = self._end_ds @@ -242,8 +246,8 @@ def _predict_in_sample( start=start_idx, end=end_idx, return_conf_int=True) - periods = calculate_period_differences(self._start_ds, end_ds, self._frequency) + 1 - ds_indices = self._get_ds_indices(start_ds=self._start_ds, periods=periods, frequency=self._frequency)[start_idx:] + periods = calculate_period_differences(self._start_ds, end_ds, self._frequency_unit, self._frequency_quantity) + 1 + ds_indices = self._get_ds_indices(start_ds=self._start_ds, periods=periods, frequency_unit=self._frequency_unit, frequency_quantity=self._frequency_quantity)[start_idx:] in_sample_pd = pd.DataFrame({'ds': ds_indices, 'yhat': preds_in_sample}) in_sample_pd[["yhat_lower", "yhat_upper"]] = conf_in_sample return in_sample_pd @@ -257,7 +261,7 @@ def _forecast( horizon, X=X, return_conf_int=True) - ds_indices = self._get_ds_indices(start_ds=self._end_ds, periods=horizon + 1, frequency=self._frequency)[1:] + ds_indices = self._get_ds_indices(start_ds=self._end_ds, periods=horizon + 1, frequency_unit=self._frequency_unit, frequency_quantity=self._frequency_quantity)[1:] preds_pd = pd.DataFrame({'ds': ds_indices, 'yhat': preds}) preds_pd[["yhat_lower", "yhat_upper"]] = conf return preds_pd @@ -268,14 +272,15 @@ class MultiSeriesArimaModel(AbstractArimaModel): ARIMA mlflow model wrapper for multivariate forecasting. """ - def __init__(self, pickled_model_dict: Dict[Tuple, bytes], horizon: int, frequency: str, + def __init__(self, pickled_model_dict: Dict[Tuple, bytes], horizon: int, frequency_unit: str, frequency_quantity: int, start_ds_dict: Dict[Tuple, pd.Timestamp], end_ds_dict: Dict[Tuple, pd.Timestamp], time_col: str, id_cols: List[str], exogenous_cols: Optional[List[str]] = None) -> None: """ Initialize the mlflow Python model wrapper for multiseries ARIMA. :param pickled_model_dict: the dictionary of binarized ARIMA models for different time series. :param horizon: int number of periods to forecast forward. - :param frequency: the frequency of the time series + :param frequency_unit: the frequency unit of the time series + :param frequency_quantity: the frequency quantity of the time series :param start_ds_dict: the dictionary of the starting time of each time series in training data. :param end_ds_dict: the dictionary of the end time of each time series in training data. :param time_col: the column name of the time column @@ -286,7 +291,8 @@ def __init__(self, pickled_model_dict: Dict[Tuple, bytes], horizon: int, frequen super().__init__() self._pickled_models = pickled_model_dict self._horizon = horizon - self._frequency = frequency + self._frequency_unit = frequency_unit + self._frequency_quantity = frequency_quantity self._starts = start_ds_dict self._ends = end_ds_dict self._time_col = time_col @@ -329,7 +335,8 @@ def make_future_dataframe( start_time=self._starts, end_time=self._ends, horizon=horizon, - frequency=self._frequency, + frequency_unit=self._frequency_unit, + frequency_quantity=self._frequency_quantity, include_history=include_history, groups=groups, identity_column_names=self._id_cols @@ -360,7 +367,7 @@ def _predict_timeseries_single_id( horizon: int, include_history: bool = True, df: Optional[pd.DataFrame] = None) -> pd.DataFrame: - arima_model_single_id = ArimaModel(self._pickled_models[id_], self._horizon, self._frequency, + arima_model_single_id = ArimaModel(self._pickled_models[id_], self._horizon, self._frequency_unit, self._frequency_quantity, self._starts[id_], self._ends[id_], self._time_col, self._exogenous_cols) preds_df = arima_model_single_id.predict_timeseries(horizon, include_history, df) for id, col_name in zip(id_, self._id_cols): @@ -401,7 +408,8 @@ def _predict_single_id(self, df: pd.DataFrame) -> pd.DataFrame: id_ = df["ts_id"].to_list()[0] arima_model_single_id = ArimaModel(self._pickled_models[id_], self._horizon, - self._frequency, + self._frequency_unit, + self._frequency_quantity, self._starts[id_], self._ends[id_], self._time_col, diff --git a/runtime/databricks/automl_runtime/forecast/pmdarima/training.py b/runtime/databricks/automl_runtime/forecast/pmdarima/training.py index 23b22c9..7cb1b35 100644 --- a/runtime/databricks/automl_runtime/forecast/pmdarima/training.py +++ b/runtime/databricks/automl_runtime/forecast/pmdarima/training.py @@ -36,7 +36,7 @@ class ArimaEstimator: def __init__(self, horizon: int, frequency_unit: str, metric: str, seasonal_periods: List[int], num_folds: int = 20, max_steps: int = 150, exogenous_cols: Optional[List[str]] = None, - split_cutoff: Optional[pd.Timestamp] = None) -> None: + split_cutoff: Optional[pd.Timestamp] = None, frequency_quantity: int = 1) -> None: """ :param horizon: Number of periods to forecast forward :param frequency_unit: Frequency of the time series @@ -47,12 +47,14 @@ def __init__(self, horizon: int, frequency_unit: str, metric: str, seasonal_peri :param exogenous_cols: Optional list of column names of exogenous variables. If provided, these columns are used as additional features in arima model. :param split_cutoff: Optional cutoff specified by user. If provided, + :param frequency_quantity: The number of frequency units in the frequency. Default is 1. it is the starting point of cutoffs for cross validation. For tuning job, it is the cutoff between train and validate split. For training job, it is the cutoff bewteen validate and test split. """ self._horizon = horizon self._frequency_unit = OFFSET_ALIAS_MAP[frequency_unit] + self._frequency_quantity = frequency_quantity self._metric = metric self._seasonal_periods = seasonal_periods self._num_folds = num_folds @@ -70,14 +72,14 @@ def fit(self, df: pd.DataFrame) -> pd.DataFrame: history_pd["ds"] = pd.to_datetime(history_pd["ds"]) # Check if the time has consistent frequency - self._validate_ds_freq(history_pd, self._frequency_unit) + self._validate_ds_freq(history_pd, self._frequency_unit, self._frequency_quantity) history_periods = utils.calculate_period_differences( - history_pd['ds'].min(), history_pd['ds'].max(), self._frequency_unit + history_pd['ds'].min(), history_pd['ds'].max(), self._frequency_unit, self._frequency_quantity ) if history_periods + 1 != history_pd['ds'].size: # Impute missing time steps - history_pd = self._fill_missing_time_steps(history_pd, self._frequency_unit) + history_pd = self._fill_missing_time_steps(history_pd, self._frequency_unit, self._frequency_quantity) # Tune seasonal periods @@ -87,26 +89,28 @@ def fit(self, df: pd.DataFrame) -> pd.DataFrame: try: # this check mirrors the the default behavior by prophet if history_periods < 2 * m: - _logger.warning(f"Skipping seasonal_period={m} ({self._frequency_unit}). Dataframe timestamps must span at least two seasonality periods, but only spans {history_periods} {self._frequency_unit}""") + _logger.warning(f"Skipping seasonal_period={m} ({self._frequency_quantity}{self._frequency_unit}). Dataframe timestamps must span at least two seasonality periods, but only spans {history_periods} {self._frequency_quantity}{self._frequency_unit}""") continue # Prophet also rejects the seasonality periods if the seasonality period timedelta is less than the shortest timedelta in the dataframe. # However, this cannot happen in ARIMA because _fill_missing_time_steps imputes values for each _frequency_unit, # so the minimum valid seasonality period is always 1 - validation_horizon = utils.get_validation_horizon(history_pd, self._horizon, self._frequency_unit) + validation_horizon = utils.get_validation_horizon(history_pd, self._horizon, self._frequency_unit, self._frequency_quantity) if self._split_cutoff: cutoffs = utils.generate_custom_cutoffs( history_pd, horizon=validation_horizon, - unit=self._frequency_unit, - split_cutoff=self._split_cutoff + frequency_unit=self._frequency_unit, + split_cutoff=self._split_cutoff, + frequency_quantity=self._frequency_quantity, ) else: cutoffs = utils.generate_cutoffs( history_pd, horizon=validation_horizon, - unit=self._frequency_unit, + frequency_unit=self._frequency_unit, num_folds=self._num_folds, + frequency_quantity=self._frequency_quantity, ) result = self._fit_predict(history_pd, cutoffs=cutoffs, seasonal_period=m, max_steps=self._max_steps) @@ -150,9 +154,9 @@ def _fit_predict(self, df: pd.DataFrame, cutoffs: List[pd.Timestamp], seasonal_p return {"metrics": metrics, "model": arima_model} @staticmethod - def _fill_missing_time_steps(df: pd.DataFrame, frequency: str): + def _fill_missing_time_steps(df: pd.DataFrame, frequency_unit: str, frequency_quantity: int): # Forward fill missing time steps - df_filled = df.set_index("ds").resample(rule=OFFSET_ALIAS_MAP[frequency]).pad().reset_index() + df_filled = df.set_index("ds").resample(rule=f"{frequency_quantity}{OFFSET_ALIAS_MAP[frequency_unit]}").pad().reset_index() start_ds, modified_start_ds = df["ds"].min(), df_filled["ds"].min() if start_ds != modified_start_ds: offset = modified_start_ds - start_ds @@ -160,12 +164,12 @@ def _fill_missing_time_steps(df: pd.DataFrame, frequency: str): return df_filled @staticmethod - def _validate_ds_freq(df: pd.DataFrame, frequency: str): + def _validate_ds_freq(df: pd.DataFrame, frequency_unit: str, frequency_quantity: int): start_ds = df["ds"].min() consistency = df["ds"].apply(lambda x: - utils.is_frequency_consistency(start_ds, x, frequency) + utils.is_frequency_consistency(start_ds, x, frequency_unit, frequency_quantity) ).all() if not consistency: raise ValueError( - f"Input time column includes different frequency than the specified frequency {frequency}." + f"Input time column includes different frequency than the specified frequency {frequency_quantity}{frequency_unit}." ) diff --git a/runtime/databricks/automl_runtime/forecast/prophet/forecast.py b/runtime/databricks/automl_runtime/forecast/prophet/forecast.py index 706700f..28c4a5e 100644 --- a/runtime/databricks/automl_runtime/forecast/prophet/forecast.py +++ b/runtime/databricks/automl_runtime/forecast/prophet/forecast.py @@ -38,10 +38,12 @@ class ProphetHyperParams(Enum): def _prophet_fit_predict(params: Dict[str, Any], history_pd: pd.DataFrame, - horizon: int, frequency: str, cutoffs: List[pd.Timestamp], + horizon: int, frequency_unit: str, cutoffs: List[pd.Timestamp], interval_width: int, primary_metric: str, country_holidays: Optional[str] = None, - regressors = None, **prophet_kwargs) -> Dict[str, Any]: + regressors = None, + frequency_quantity: int = 1, + **prophet_kwargs) -> Dict[str, Any]: """ Training function for hyperparameter tuning with hyperopt @@ -49,7 +51,8 @@ def _prophet_fit_predict(params: Dict[str, Any], history_pd: pd.DataFrame, :param history_pd: pd.DataFrame containing the history. Must have columns ds (date type) and y, the time series :param horizon: Forecast horizon_timedelta - :param frequency: Frequency of the time series + :param frequency_unit: Frequency unit of the time series + :param frequency_quantity: the number of time units that make up a single period of the time series. For now, only 1/5/10/15/30 minutes, 1 hour, 1 day, 1 week, 1 month, 1 quarter, 1 year are supported. :param num_folds: Number of folds for cross validation :param interval_width: Width of the uncertainty intervals provided for the forecast :param primary_metric: Metric that will be optimized across trials @@ -67,8 +70,8 @@ def _prophet_fit_predict(params: Dict[str, Any], history_pd: pd.DataFrame, model.add_regressor(regressor) model.fit(history_pd, iter=200) - offset_kwarg = DATE_OFFSET_KEYWORD_MAP[OFFSET_ALIAS_MAP[frequency]] - horizon_offset = pd.DateOffset(**offset_kwarg)*horizon + offset_kwarg = DATE_OFFSET_KEYWORD_MAP[OFFSET_ALIAS_MAP[frequency_unit]] + horizon_offset = pd.DateOffset(**offset_kwarg)*frequency_quantity*horizon # Evaluate Metrics df_cv = cross_validation( model, horizon=horizon_offset, cutoffs=cutoffs, disable_tqdm=True @@ -92,7 +95,9 @@ def __init__(self, horizon: int, frequency_unit: str, metric: str, interval_widt max_eval: int = 10, trial_timeout: int = None, random_state: int = 0, is_parallel: bool = True, regressors = None, - split_cutoff: Optional[pd.Timestamp] = None, **prophet_kwargs) -> None: + split_cutoff: Optional[pd.Timestamp] = None, + frequency_quantity: int = 1, + **prophet_kwargs) -> None: """ Initialization @@ -119,6 +124,7 @@ def __init__(self, horizon: int, frequency_unit: str, metric: str, interval_widt """ self._horizon = horizon self._frequency_unit = OFFSET_ALIAS_MAP[frequency_unit] + self._frequency_quantity = frequency_quantity self._metric = metric self._interval_width = interval_width self._country_holidays = country_holidays @@ -144,24 +150,28 @@ def fit(self, df: pd.DataFrame) -> pd.DataFrame: seasonality_mode = ["additive", "multiplicative"] - validation_horizon = utils.get_validation_horizon(df, self._horizon, self._frequency_unit) + validation_horizon = utils.get_validation_horizon(df, self._horizon, self._frequency_unit, self._frequency_quantity) if self._split_cutoff: cutoffs = utils.generate_custom_cutoffs( df.reset_index(drop=True), horizon=validation_horizon, - unit=self._frequency_unit, - split_cutoff=self._split_cutoff + frequency_unit=self._frequency_unit, + split_cutoff=self._split_cutoff, + frequency_quantity=self._frequency_quantity, ) else: cutoffs = utils.generate_cutoffs( df.reset_index(drop=True), horizon=validation_horizon, - unit=self._frequency_unit, + frequency_unit=self._frequency_unit, num_folds=self._num_folds, + frequency_quantity=self._frequency_quantity, ) train_fn = partial(_prophet_fit_predict, history_pd=df, horizon=validation_horizon, - frequency=self._frequency_unit, cutoffs=cutoffs, + frequency_unit=self._frequency_unit, + frequency_quantity=self._frequency_quantity, + cutoffs=cutoffs, interval_width=self._interval_width, primary_metric=self._metric, country_holidays=self._country_holidays, regressors=self._regressors, **self._prophet_kwargs) diff --git a/runtime/databricks/automl_runtime/forecast/prophet/model.py b/runtime/databricks/automl_runtime/forecast/prophet/model.py index f678ce6..0696bef 100644 --- a/runtime/databricks/automl_runtime/forecast/prophet/model.py +++ b/runtime/databricks/automl_runtime/forecast/prophet/model.py @@ -45,21 +45,27 @@ class ProphetModel(ForecastModel): Prophet mlflow model wrapper for univariate forecasting. """ - def __init__(self, model_json: Union[Dict[Tuple, str], str], horizon: int, frequency: str, + def __init__(self, + model_json: Union[Dict[Tuple, str], str], + horizon: int, + frequency_unit: str, + frequency_quantity: int, time_col: str) -> None: """ Initialize the mlflow Python model wrapper for mlflow :param model_json: json string of the Prophet model or the dictionary of json strings of Prophet model for multi-series forecasting :param horizon: Int number of periods to forecast forward. - :param frequency: the frequency of the time series + :param frequency_unit: the frequency unit of the time series + :param frequency_quantity: the frequency quantity of the time series :param time_col: the column name of the time column """ self._model_json = model_json self._horizon = horizon - self._frequency = frequency + self._frequency_unit = frequency_unit + self._frequency_quantity = frequency_quantity self._time_col = time_col - self._is_quaterly = is_quaterly_alias(frequency) + self._is_quaterly = is_quaterly_alias(frequency_unit) super().__init__() def load_context(self, context: mlflow.pyfunc.model.PythonModelContext) -> None: @@ -92,7 +98,8 @@ def make_future_dataframe(self, horizon: int = None, include_history: bool = Tru :return: pd.Dataframe that extends forward from the end of self.history for the requested number of periods. """ - offset_kwarg = DATE_OFFSET_KEYWORD_MAP[OFFSET_ALIAS_MAP[self._frequency]] + offset_kwarg = DATE_OFFSET_KEYWORD_MAP[OFFSET_ALIAS_MAP[self._frequency_unit]] + offset_kwarg = {key: value * self._frequency_quantity for key, value in offset_kwarg.items()} return self.model().make_future_dataframe(periods=horizon or self._horizon, freq=pd.DateOffset(**offset_kwarg), include_history=include_history) @@ -144,7 +151,7 @@ class MultiSeriesProphetModel(ProphetModel): """ def __init__(self, model_json: Dict[Tuple, str], timeseries_starts: Dict[Tuple, pd.Timestamp], - timeseries_end: str, horizon: int, frequency: str, time_col: str, id_cols: List[str], + timeseries_end: str, horizon: int, frequency_unit: str, frequency_quantity: int, time_col: str, id_cols: List[str], ) -> None: """ Initialize the mlflow Python model wrapper for mlflow @@ -152,12 +159,14 @@ def __init__(self, model_json: Dict[Tuple, str], timeseries_starts: Dict[Tuple, :param timeseries_starts: the dictionary of pd.Timestamp as the starting time of each time series :param timeseries_end: the end time of the time series :param horizon: int number of periods to forecast forward - :param frequency: the frequency of the time series + :param frequency_unit: the frequency unit of the time series + :param frequency_quantity: the frequency quantity of the time series :param time_col: the column name of the time column :param id_cols: the column names of the identity columns for multi-series time series """ - super().__init__(model_json, horizon, frequency, time_col) - self._frequency = frequency + super().__init__(model_json, horizon, frequency_unit, frequency_quantity, time_col) + self._frequency_unit = frequency_unit + self._frequency_quantity = frequency_quantity self._timeseries_end = timeseries_end self._timeseries_starts = timeseries_starts self._id_cols = id_cols @@ -200,7 +209,8 @@ def make_future_dataframe( start_time=self._timeseries_starts, end_time=end_time, horizon=horizon, - frequency=self._frequency, + frequency_unit=self._frequency_unit, + frequency_quantity=self._frequency_quantity, include_history=include_history, groups=groups, identity_column_names=self._id_cols @@ -235,7 +245,8 @@ def predict_timeseries(self, horizon: int = None, include_history: bool = True) start_time=self._timeseries_starts, end_time=end_time, horizon=horizon, - frequency=self._frequency, + frequency_unit=self._frequency_unit, + frequency_quantity=self._frequency_quantity, include_history=include_history, groups=self._model_json.keys(), identity_column_names=self._id_cols diff --git a/runtime/databricks/automl_runtime/forecast/utils.py b/runtime/databricks/automl_runtime/forecast/utils.py index 3b5c394..195d35a 100644 --- a/runtime/databricks/automl_runtime/forecast/utils.py +++ b/runtime/databricks/automl_runtime/forecast/utils.py @@ -26,7 +26,8 @@ def make_future_dataframe( start_time: Union[pd.Timestamp, Dict[Tuple, pd.Timestamp]], end_time: Union[pd.Timestamp, Dict[Tuple, pd.Timestamp]], horizon: int, - frequency: str, + frequency_unit: str, + frequency_quantity: int, include_history: bool = True, groups: List[Tuple] = None, identity_column_names: List[str] = None, @@ -36,14 +37,15 @@ def make_future_dataframe( :param start_time: the dictionary of the starting time of each time series in training data. :param end_time: the dictionary of the end time of each time series in training data. :param horizon: int number of periods to forecast forward. - :param frequency: the frequency of the time series + :param frequency_unit: the frequency unit of the time series + :param frequency_quantity: the multiplier for the frequency. :param include_history: :param groups: the collection of group(s) to generate forecast predictions. :param identity_column_names: Column names of the identity columns :return: pd.DataFrame that extends forward """ if groups is None: - return make_single_future_dataframe(start_time, end_time, horizon, frequency) + return make_single_future_dataframe(start_time, end_time, horizon, frequency_unit, frequency_quantity) future_df_list = [] for group in groups: @@ -55,7 +57,7 @@ def make_future_dataframe( group_end_time = end_time[group] else: group_end_time = end_time - df = make_single_future_dataframe(group_start_time, group_end_time, horizon, frequency, include_history) + df = make_single_future_dataframe(group_start_time, group_end_time, horizon, frequency_unit, frequency_quantity, include_history) for idx, identity_column_name in enumerate(identity_column_names): df[identity_column_name] = group[idx] future_df_list.append(df) @@ -65,7 +67,8 @@ def make_single_future_dataframe( start_time: pd.Timestamp, end_time: pd.Timestamp, horizon: int, - frequency: str, + frequency_unit: str, + frequency_quantity: int, include_history: bool = True, column_name: str = "ds" ) -> pd.DataFrame: @@ -74,29 +77,30 @@ def make_single_future_dataframe( :param start_time: The starting time of time series of the training data. :param end_time: The end time of time series of the training data. :param horizon: Int number of periods to forecast forward. - :param frequency: The frequency of the time series + :param frequency_unit: The frequency unit of the time series + :param frequency_quantity: The frequency quantity of the time series :param include_history: Boolean to include the historical dates in the data frame for predictions. :param column_name: column name of the time column. Default is "ds". :return: """ - offset_freq = DATE_OFFSET_KEYWORD_MAP[OFFSET_ALIAS_MAP[frequency]] - unit_offset = pd.DateOffset(**offset_freq) + offset_freq = DATE_OFFSET_KEYWORD_MAP[OFFSET_ALIAS_MAP[frequency_unit]] + timestep_offset = pd.DateOffset(**offset_freq) * frequency_quantity end_time = pd.Timestamp(end_time) if include_history: start_time = start_time else: - start_time = end_time + unit_offset + start_time = end_time + timestep_offset date_rng = pd.date_range( start=start_time, - end=end_time + unit_offset*horizon, - freq=unit_offset + end=end_time + timestep_offset * horizon, + freq=timestep_offset ) return pd.DataFrame(date_rng, columns=[column_name]) -def get_validation_horizon(df: pd.DataFrame, horizon: int, unit: str, frequency_quantity: int = 1) -> int: +def get_validation_horizon(df: pd.DataFrame, horizon: int, frequency_unit: str, frequency_quantity: int = 1) -> int: """ Return validation_horizon, which is the lesser of `horizon` and one quarter of the dataframe's timedelta Since the seasonality period is never more than half of the dataframe's timedelta, @@ -104,7 +108,7 @@ def get_validation_horizon(df: pd.DataFrame, horizon: int, unit: str, frequency_ behavior, and we enforce it for ARIMA.) :param df: pd.DataFrame of the historical data :param horizon: int number of time into the future for forecasting - :param unit: frequency unit of the time series, which must be a pandas offset alias + :param frequency_unit: frequency unit of the time series, which must be a pandas offset alias :param frequency_quantity: int multiplier for the frequency unit, representing the number of `unit`s per time step in the dataframe. This is useful when the time series has a granularity that spans multiple `unit`s (e.g., if `unit='min'` and `frequency_quantity=5`, it means the data @@ -112,7 +116,7 @@ def get_validation_horizon(df: pd.DataFrame, horizon: int, unit: str, frequency_ :return: horizon used for validation, in terms of the input `unit` """ MIN_HORIZONS = 4 # minimum number of horizons in the dataframe - horizon_dateoffset = pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[unit]) * horizon * frequency_quantity + horizon_dateoffset = pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[frequency_unit]) * horizon * frequency_quantity try: if MIN_HORIZONS * horizon_dateoffset + df["ds"].min() <= df["ds"].max(): @@ -123,7 +127,7 @@ def get_validation_horizon(df: pd.DataFrame, horizon: int, unit: str, frequency_ # In order to calculate the validation horizon, we incrementally add offset # to the start time to the quarter of total timedelta. We did this since # pd.DateOffset does not support divide by operation. - timestep_dateoffset = pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[unit]) * frequency_quantity + timestep_dateoffset = pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[frequency_unit]) * frequency_quantity max_horizon = 0 cur_timestamp = df["ds"].min() while cur_timestamp + timestep_dateoffset <= df["ds"].max(): @@ -133,36 +137,38 @@ def get_validation_horizon(df: pd.DataFrame, horizon: int, unit: str, frequency_ f"timedelta. Validation horizon will be reduced to {max_horizon//MIN_HORIZONS*timestep_dateoffset}.") return max_horizon // MIN_HORIZONS -def generate_cutoffs(df: pd.DataFrame, horizon: int, unit: str, +def generate_cutoffs(df: pd.DataFrame, horizon: int, frequency_unit: str, num_folds: int, seasonal_period: int = 0, - seasonal_unit: Optional[str] = None) -> List[pd.Timestamp]: + seasonal_unit: Optional[str] = None, + frequency_quantity: int = 1) -> List[pd.Timestamp]: """ Generate cutoff times for cross validation with the control of number of folds. :param df: pd.DataFrame of the historical data. :param horizon: int number of time into the future for forecasting. - :param unit: frequency unit of the time series, which must be a pandas offset alias. + :param frequency_unit: frequency unit of the time series, which must be a pandas offset alias. :param num_folds: int number of cutoffs for cross validation. :param seasonal_period: length of the seasonality period. :param seasonal_unit: Optional frequency unit for the seasonal period. If not specified, the function will use the same frequency unit as the time series. + :param frequency_quantity: frequency quantity of the time series. :return: list of pd.Timestamp cutoffs for cross-validation. """ period = max(0.5 * horizon, 1) # avoid empty cutoff buckets # avoid non-integer months, quaters ands years. - if unit in NON_DAILY_OFFSET_ALIAS: + if frequency_unit in NON_DAILY_OFFSET_ALIAS: period = int(period) - period_dateoffset = pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[unit])*period + period_dateoffset = pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[frequency_unit])*frequency_quantity*period else: - offset_kwarg = {list(DATE_OFFSET_KEYWORD_MAP[unit])[0]: period} - period_dateoffset = pd.DateOffset(**offset_kwarg) + offset_kwarg = {list(DATE_OFFSET_KEYWORD_MAP[frequency_unit])[0]: period} + period_dateoffset = pd.DateOffset(**offset_kwarg) * frequency_quantity - horizon_dateoffset = pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[unit])*horizon + horizon_dateoffset = pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[frequency_unit])*frequency_quantity*horizon if not seasonal_unit: - seasonal_unit = unit + seasonal_unit = frequency_unit - seasonality_dateoffset = pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[unit])*seasonal_period + seasonality_dateoffset = pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[frequency_unit])*frequency_quantity*seasonal_period # We can not compare DateOffset directly, so we add to start time and compare. initial = seasonality_dateoffset @@ -191,23 +197,24 @@ def generate_cutoffs(df: pd.DataFrame, horizon: int, unit: str, ) return list(reversed(result)) -def generate_custom_cutoffs(df: pd.DataFrame, horizon: int, unit: str, - split_cutoff: pd.Timestamp) -> List[pd.Timestamp]: +def generate_custom_cutoffs(df: pd.DataFrame, horizon: int, frequency_unit: str, + split_cutoff: pd.Timestamp, frequency_quantity: int = 1) -> List[pd.Timestamp]: """ Generate custom cutoff times for cross validation based on user-specified split cutoff. Period (step size) is 1. :param df: pd.DataFrame of the historical data. :param horizon: int number of time into the future for forecasting. - :param unit: frequency unit of the time series, which must be a pandas offset alias. + :param frequency_unit: frequency unit of the time series, which must be a pandas offset alias. :param split_cutoff: the user-specified cutoff, as the starting point of cutoffs. + :param frequency_quantity: frequency quantity of the time series. For tuning job, it is the cutoff between train and validate split. For training job, it is the cutoff bewteen validate and test split. :return: list of pd.Timestamp cutoffs for cross-validation. """ # TODO: [ML-43528] expose period as input. period = 1 - period_dateoffset = pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[unit])*period - horizon_dateoffset = pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[unit])*horizon + period_dateoffset = pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[frequency_unit])*period*frequency_quantity + horizon_dateoffset = pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[frequency_unit])*horizon*frequency_quantity # First cutoff is the cutoff bewteen splits cutoff = split_cutoff @@ -229,7 +236,8 @@ def is_quaterly_alias(freq: str): def is_frequency_consistency( start_time: pd.Timestamp, end_time: pd.Timestamp, - freq:str) -> bool: + frequency_unit:str, + frequency_quantity: int) -> bool: """ Validate the periods given a start time, end time is consistent with given frequency. We consider consistency as only integer frequencies between start and end time, e.g. @@ -239,30 +247,35 @@ def is_frequency_consistency( :param end_time: A pandas timestamp. :param freq: A string that is accepted by OFFSET_ALIAS_MAP, e.g. 'day', 'month' etc. + :param frequency_quantity: the multiplier for the frequency. :return: A boolean indicate whether the time interval is evenly divisible by the period. """ - periods = calculate_period_differences(start_time, end_time, freq) - diff = pd.to_datetime(end_time) - pd.DateOffset( - **DATE_OFFSET_KEYWORD_MAP[OFFSET_ALIAS_MAP[freq]] - ) * periods == pd.to_datetime(start_time) + periods = calculate_period_differences(start_time, end_time, frequency_unit, frequency_quantity) + # If the difference between start and end time is divisible by the period time + diff = (pd.to_datetime(end_time) - pd.DateOffset( + **DATE_OFFSET_KEYWORD_MAP[OFFSET_ALIAS_MAP[frequency_unit]] + ) * periods * frequency_quantity) == pd.to_datetime(start_time) return diff def calculate_period_differences( start_time: pd.Timestamp, end_time: pd.Timestamp, - freq:str) -> int: + frequency_unit:str, + frequency_quantity: int) -> int: """ Calculate the periods given a start time, end time and period frequency. :param start_time: A pandas timestamp. :param end_time: A pandas timestamp. :param freq: A string that is accepted by OFFSET_ALIAS_MAP, e.g. 'day', 'month' etc. + :param frequency_quantity: An integer that is the multiplier for the frequency. :return: A pd.Series indicates the round-down integer period calculated. """ start_time = pd.to_datetime(start_time) end_time = pd.to_datetime(end_time) - freq_alias = PERIOD_ALIAS_MAP[OFFSET_ALIAS_MAP[freq]] - return (end_time.to_period(freq_alias) - start_time.to_period(freq_alias)).n + freq_alias = PERIOD_ALIAS_MAP[OFFSET_ALIAS_MAP[frequency_unit]] + # It is intended to get the floor value. And in the later check we will use this floor value to find out if it is not consistent. + return (end_time.to_period(freq_alias) - start_time.to_period(freq_alias)).n // frequency_quantity diff --git a/runtime/tests/automl_runtime/forecast/deepar/model_test.py b/runtime/tests/automl_runtime/forecast/deepar/model_test.py index 44894b9..bee823d 100644 --- a/runtime/tests/automl_runtime/forecast/deepar/model_test.py +++ b/runtime/tests/automl_runtime/forecast/deepar/model_test.py @@ -20,6 +20,7 @@ import pandas as pd import torch import torch.nn as nn +from parameterized import parameterized from gluonts.dataset.field_names import FieldName from gluonts.transform import InstanceSplitter, TestSplitSampler from gluonts.torch.model.predictor import PyTorchPredictor @@ -93,7 +94,8 @@ def test_model_save_and_load_single_series(self): deepar_model = DeepARModel( model=self.model, horizon=self.prediction_length, - frequency="d", + frequency_unit="d", + frequency_quantity=1, num_samples=1, target_col=target_col, time_col=time_col, @@ -137,7 +139,8 @@ def test_model_save_and_load_multi_series(self): model=self.model, horizon=self.prediction_length, num_samples=1, - frequency="d", + frequency_unit="d", + frequency_quantity=1, target_col=target_col, time_col=time_col, id_cols=[id_col], @@ -184,7 +187,8 @@ def test_model_save_and_load_multi_series_multi_id_cols(self): model=self.model, horizon=self.prediction_length, num_samples=1, - frequency="d", + frequency_unit="d", + frequency_quantity=1, target_col=target_col, time_col=time_col, id_cols=id_cols, @@ -230,7 +234,8 @@ def test_model_prediction_with_duplicate_timestamps(self): deepar_model = DeepARModel( model=self.model, horizon=self.prediction_length, - frequency="d", + frequency_unit="d", + frequency_quantity=1, num_samples=1, target_col=target_col, time_col=time_col, @@ -273,7 +278,8 @@ def test_model_prediction_with_monthly_data(self): deepar_model = DeepARModel( model=self.model, horizon=self.prediction_length, - frequency="MS", + frequency_unit="MS", + frequency_quantity=1, num_samples=1, target_col=target_col, time_col=time_col, @@ -305,4 +311,44 @@ def test_model_prediction_with_monthly_data(self): # Verify the prediction output format self.assertEqual(pred_df.columns.tolist(), [time_col, "yhat"]) self.assertEqual(len(pred_df), self.prediction_length) - self.assertGreater(pred_df[time_col].min(), sample_input[time_col].max()) \ No newline at end of file + self.assertGreater(pred_df[time_col].min(), sample_input[time_col].max()) + + @parameterized.expand([(1,), (5,), (10,), (15,), (30,)]) + def test_model_prediction_with_multiple_minutes_frequency(self, frequency_quantity): + target_col = "sales" + time_col = "date" + + deepar_model = DeepARModel( + model=self.model, + horizon=self.prediction_length, + frequency_unit="min", + frequency_quantity=frequency_quantity, + num_samples=1, + target_col=target_col, + time_col=time_col, + ) + + # Create sample input with duplicate timestamps + dates = pd.date_range(start="2020-10-01", periods=6, freq=f"{frequency_quantity}min") + + sales = [10, 20, 30, + 60, 90, 100] + + sample_input = pd.DataFrame({ + time_col: dates, + target_col: sales + }) + + with mlflow.start_run() as run: + mlflow_deepar_log_model(deepar_model, sample_input) + + run_id = run.info.run_id + + # Load the model and predict + loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") + pred_df = loaded_model.predict(sample_input) + + # Verify the prediction output format + self.assertEqual(pred_df.columns.tolist(), [time_col, "yhat"]) + self.assertEqual(len(pred_df), self.prediction_length) + self.assertGreater(pred_df[time_col].min(), sample_input[time_col].max()) diff --git a/runtime/tests/automl_runtime/forecast/deepar/utils_test.py b/runtime/tests/automl_runtime/forecast/deepar/utils_test.py index 98aa75b..3aebf1e 100644 --- a/runtime/tests/automl_runtime/forecast/deepar/utils_test.py +++ b/runtime/tests/automl_runtime/forecast/deepar/utils_test.py @@ -16,6 +16,7 @@ import unittest import pandas as pd +from parameterized import parameterized from databricks.automl_runtime.forecast.deepar.utils import set_index_and_fill_missing_time_steps @@ -39,7 +40,7 @@ def test_single_series_filled(self): ) dropped_df = base_df.drop([4, 5]).reset_index(drop=True) - transformed_df = set_index_and_fill_missing_time_steps(dropped_df, time_col, "D") + transformed_df = set_index_and_fill_missing_time_steps(dropped_df, time_col, "D", 1) expected_df = base_df.copy() expected_df.loc[[4, 5], target_col] = float('nan') @@ -68,7 +69,7 @@ def test_multi_series_filled(self): dropped_df = pd.concat([dropped_base_df.copy(), dropped_base_df.copy()], ignore_index=True) dropped_df[id_col] = [1] * (num_rows_per_ts - 2) + [2] * (num_rows_per_ts - 2) - transformed_df_dict = set_index_and_fill_missing_time_steps(dropped_df, time_col, "D", id_cols=[id_col]) + transformed_df_dict = set_index_and_fill_missing_time_steps(dropped_df, time_col, "D", 1, id_cols=[id_col]) self.assertEqual(transformed_df_dict.keys(), {"1", "2"}) expected_first_df = base_df.copy() @@ -100,7 +101,7 @@ def test_multi_series_multi_id_cols_filled(self): dropped_df[id_cols[0]] = ([1] * (num_rows_per_ts - 2) + [2] * (num_rows_per_ts - 2)) * 2 dropped_df[id_cols[1]] = [1] * (2 * (num_rows_per_ts - 2)) + [2] * (2 * (num_rows_per_ts - 2)) - transformed_df_dict = set_index_and_fill_missing_time_steps(dropped_df, time_col, "D", id_cols=id_cols) + transformed_df_dict = set_index_and_fill_missing_time_steps(dropped_df, time_col, "D", 1, id_cols=id_cols) self.assertEqual(transformed_df_dict.keys(), {"1-1", "1-2", "2-1", "2-2"}) expected_first_df = base_df.copy() @@ -133,7 +134,8 @@ def test_single_series_week_day_index(self): transformed_df = set_index_and_fill_missing_time_steps( dropped_df, time_col, - "W" # Weekly frequency **without** specifying Friday + "W", # Weekly frequency **without** specifying Friday + 1 ) # Create expected dataframe @@ -168,7 +170,8 @@ def test_single_series_month_start_index(self): transformed_df = set_index_and_fill_missing_time_steps( dropped_df, time_col, - "MS" # Monthly frequency + "MS", # Monthly frequency + 1 ) # Create expected dataframe @@ -204,7 +207,8 @@ def test_single_series_month_mid_index(self): transformed_df = set_index_and_fill_missing_time_steps( dropped_df, time_col, - "MS" + "MS", + 1 ) # Create expected dataframe @@ -241,7 +245,8 @@ def test_single_series_month_end_index(self): transformed_df = set_index_and_fill_missing_time_steps( dropped_df, time_col, - "MS" # Monthly frequency + "MS", # Monthly frequency + 1 ) # Create expected dataframe @@ -252,3 +257,35 @@ def test_single_series_month_end_index(self): # Assert equality pd.testing.assert_frame_equal(transformed_df, expected_df) + + @parameterized.expand([(1,), (5,), (10,), (15,), (30,)]) + def test_single_series_with_multiple_minute_index(self, frequency_quantity): + target_col = "sales" + time_col = "date" + num_rows = 6 + + base_dates = pd.date_range(start="2020-10-01", periods=num_rows, freq=f"{frequency_quantity}min") + + base_df = pd.DataFrame({ + time_col: base_dates, + target_col: range(num_rows) + }) + + # Create a dataframe with missing months (drop the 3rd and 4th rows) + dropped_df = base_df.drop([3, 4]).reset_index(drop=True) + + # Transform the dataframe + transformed_df = set_index_and_fill_missing_time_steps( + dropped_df, + time_col, + "min", + frequency_quantity + ) + + # Create expected dataframe + expected_df = base_df.copy() + expected_df.loc[[3, 4], target_col] = float('nan') + expected_df = expected_df.set_index(time_col).rename_axis(None).asfreq(f"{frequency_quantity}min") + + # Assert equality + pd.testing.assert_frame_equal(transformed_df, expected_df) diff --git a/runtime/tests/automl_runtime/forecast/pmdarima/diagnostics_test.py b/runtime/tests/automl_runtime/forecast/pmdarima/diagnostics_test.py index f15f548..bd4380c 100644 --- a/runtime/tests/automl_runtime/forecast/pmdarima/diagnostics_test.py +++ b/runtime/tests/automl_runtime/forecast/pmdarima/diagnostics_test.py @@ -43,7 +43,7 @@ class TestDiagnostics(unittest.TestCase): (df_with_exogenous, ["x1", "x2"]) ]) def test_cross_validation_success(self, df, exogenous_cols): - cutoffs = generate_cutoffs(df, horizon=3, unit="D", seasonal_period=1, seasonal_unit="D", num_folds=3) + cutoffs = generate_cutoffs(df, horizon=3, frequency_unit="D", seasonal_period=1, seasonal_unit="D", num_folds=3) train_df = df[df["ds"] <= cutoffs[0]].set_index("ds") y_train = train_df[["y"]] X_train = train_df.drop(["y"], axis=1) diff --git a/runtime/tests/automl_runtime/forecast/pmdarima/model_test.py b/runtime/tests/automl_runtime/forecast/pmdarima/model_test.py index 3c6d0d4..5918d29 100644 --- a/runtime/tests/automl_runtime/forecast/pmdarima/model_test.py +++ b/runtime/tests/automl_runtime/forecast/pmdarima/model_test.py @@ -41,7 +41,8 @@ def setUp(self) -> None: self.start_ds = pd.Timestamp("2020-10-01") self.horizon = 1 self.freq = 'W' - dates = AbstractArimaModel._get_ds_indices(self.start_ds, periods=self.num_rows, frequency=self.freq) + self.frequency_quantity=1 + dates = AbstractArimaModel._get_ds_indices(self.start_ds, periods=self.num_rows, frequency_unit=self.freq, frequency_quantity=self.frequency_quantity) self.df = pd.concat([ pd.Series(dates, name='date'), pd.Series(range(self.num_rows), name="y") @@ -51,7 +52,8 @@ def setUp(self) -> None: pickled_model = pickle.dumps(model) self.arima_model = ArimaModel(pickled_model, horizon=self.horizon, - frequency=self.freq, + frequency_unit=self.freq, + frequency_quantity=self.frequency_quantity, start_ds=self.start_ds, end_ds=pd.Timestamp("2020-11-26"), time_col="date") @@ -67,7 +69,8 @@ def test_predict_timeseries_success(self): expected_ds = AbstractArimaModel._get_ds_indices( self.start_ds, periods=self.num_rows + self.horizon, - frequency=self.freq) + frequency_unit=self.freq, + frequency_quantity=self.frequency_quantity) self.assertTrue(expected_columns.issubset(set(forecast_pd.columns))) self.assertEqual(10, forecast_pd.shape[0]) pd.testing.assert_series_equal(pd.Series(expected_ds, name='ds'), forecast_pd["ds"]) @@ -135,8 +138,9 @@ def setUp(self) -> None: self.start_ds = datetime.date(2020, 10, 1) self.horizon = 1 self.freq = 'W' + self.frequency_quantity = 1 dates = AbstractArimaModel._get_ds_indices( - pd.to_datetime(self.start_ds), periods=self.num_rows, frequency=self.freq) + pd.to_datetime(self.start_ds), periods=self.num_rows, frequency_unit=self.freq, frequency_quantity=self.frequency_quantity) self.df = pd.concat([ pd.Series(dates, name='date'), pd.Series(range(self.num_rows), name="y") @@ -146,7 +150,8 @@ def setUp(self) -> None: pickled_model = pickle.dumps(model) self.arima_model = ArimaModel(pickled_model, horizon=self.horizon, - frequency=self.freq, + frequency_unit=self.freq, + frequency_quantity=self.frequency_quantity, start_ds=self.start_ds, end_ds=pd.Timestamp("2020-11-26"), time_col="date") @@ -168,7 +173,8 @@ def setUp(self) -> None: self.start_ds = pd.Timestamp("2020-10-01") self.horizon = 1 self.freq = 'W' - dates = AbstractArimaModel._get_ds_indices(self.start_ds, periods=self.num_rows, frequency=self.freq) + self.frequency_quantity = 1 + dates = AbstractArimaModel._get_ds_indices(self.start_ds, periods=self.num_rows, frequency_unit=self.freq, frequency_quantity=self.frequency_quantity) self.df = pd.concat([ pd.Series(dates, name='date'), pd.Series(range(self.num_rows), name="y"), @@ -183,7 +189,8 @@ def setUp(self) -> None: pickled_model = pickle.dumps(model) self.arima_model = ArimaModel(pickled_model, horizon=self.horizon, - frequency=self.freq, + frequency_unit=self.freq, + frequency_quantity=self.frequency_quantity, start_ds=self.start_ds, end_ds=pd.Timestamp("2020-11-26"), time_col="date", @@ -227,7 +234,8 @@ def setUp(self) -> None: end_ds_dict = {("1",): pd.Timestamp("2020-09-13"), ("2",): pd.Timestamp("2020-09-13")} self.arima_model = MultiSeriesArimaModel(pickled_model_dict, horizon=1, - frequency='month', + frequency_unit='month', + frequency_quantity=1, start_ds_dict=start_ds_dict, end_ds_dict=end_ds_dict, time_col="date", @@ -310,7 +318,8 @@ def test_make_future_dataframe_multi_ids(self): end_ds_dict = {(1, "1"): pd.Timestamp("2020-09-13"), (2, "1"): pd.Timestamp("2020-09-13")} arima_model = MultiSeriesArimaModel(pickled_model_dict, horizon=1, - frequency='month', + frequency_unit='month', + frequency_quantity=1, start_ds_dict=start_ds_dict, end_ds_dict=end_ds_dict, time_col="date", @@ -350,7 +359,8 @@ def setUp(self) -> None: end_ds_dict = {("1",): pd.Timestamp("2020-09-13"), ("2",): pd.Timestamp("2020-09-13")} self.arima_model = MultiSeriesArimaModel(pickled_model_dict, horizon=1, - frequency='month', + frequency_unit='month', + frequency_quantity=1, start_ds_dict=start_ds_dict, end_ds_dict=end_ds_dict, time_col="date", @@ -404,7 +414,8 @@ def test_get_ds_weekly(self): ds_indices = AbstractArimaModel._get_ds_indices( start_ds=pd.Timestamp("2022-01-01 12:30"), periods=8, - frequency='W') + frequency_unit='W', + frequency_quantity=1) pd.testing.assert_index_equal(expected_ds, ds_indices) def test_get_ds_hourly(self): @@ -418,7 +429,8 @@ def test_get_ds_hourly(self): ds_indices = AbstractArimaModel._get_ds_indices( start_ds=pd.Timestamp("2021-12-10 09:23"), periods=10, - frequency='H') + frequency_unit='H', + frequency_quantity=1) pd.testing.assert_index_equal(expected_ds, ds_indices) @@ -435,7 +447,7 @@ def setUp(self) -> None: self.pickled_model = pickle.dumps(model) def test_mlflow_arima_log_model(self): - arima_model = ArimaModel(self.pickled_model, horizon=1, frequency='d', + arima_model = ArimaModel(self.pickled_model, horizon=1, frequency_unit='d', frequency_quantity=1, start_ds=pd.to_datetime("2020-10-01"), end_ds=pd.to_datetime("2020-10-09"), time_col="date") with mlflow.start_run() as run: @@ -460,7 +472,8 @@ def test_mlflow_arima_log_model_multiseries(self): end_ds_dict = {("1",): pd.Timestamp("2020-10-09"), ("2",): pd.Timestamp("2020-10-09")} multiseries_arima_model = MultiSeriesArimaModel(pickled_model_dict, horizon=1, - frequency='d', + frequency_unit='d', + frequency_quantity=1, start_ds_dict=start_ds_dict, end_ds_dict=end_ds_dict, time_col="date", @@ -497,3 +510,112 @@ def _check_requirements(self, run_id: str): # check if all additional dependencies are logged for dependency in ARIMA_ADDITIONAL_PIP_DEPS: self.assertIn(dependency, requirements, f"requirements.txt should contain {dependency} but got {requirements}") + +class TestArimaModelFrequencyQuantity(unittest.TestCase): + + def setUp(self) -> None: + self.num_rows = 9 + self.start_ds = pd.Timestamp("2020-10-01") + self.horizon = 1 + self.freq = 'min' + frequency_quantities = [1, 5, 10, 15, 30] + self.quantity_model_pairs = [] + + for frequency_quantity in frequency_quantities: + dates = AbstractArimaModel._get_ds_indices(self.start_ds, periods=self.num_rows, frequency_unit=self.freq, frequency_quantity=frequency_quantity) + df = pd.concat([ + pd.Series(dates, name='date'), + pd.Series(range(self.num_rows), name="y") + ], axis=1) + model = ARIMA(order=(2, 0, 2), suppress_warnings=True) + model.fit(df.set_index("date")) + pickled_model = pickle.dumps(model) + self.quantity_model_pairs.append((frequency_quantity, ArimaModel(pickled_model, + horizon=self.horizon, + frequency_unit=self.freq, + frequency_quantity=frequency_quantity, + start_ds=self.start_ds, + end_ds=dates.max(), + time_col="date"))) + + def test_make_future_dataframe(self): + for frequency_quantity, arima_model in self.quantity_model_pairs: + future_df = arima_model.make_future_dataframe(include_history=False) + self.assertCountEqual(future_df.columns, {"ds"}) + self.assertEqual(1, future_df.shape[0]) + + def test_predict_timeseries_success(self): + for frequency_quantity, arima_model in self.quantity_model_pairs: + forecast_pd = arima_model.predict_timeseries() + expected_columns = {"yhat", "yhat_lower", "yhat_upper"} + expected_ds = AbstractArimaModel._get_ds_indices( + self.start_ds, + periods=self.num_rows + self.horizon, + frequency_unit=self.freq, + frequency_quantity=frequency_quantity) + self.assertTrue(expected_columns.issubset(set(forecast_pd.columns))) + self.assertEqual(10, forecast_pd.shape[0]) + pd.testing.assert_series_equal(pd.Series(expected_ds, name='ds'), forecast_pd["ds"]) + # Test forecast without history data + forecast_future_pd = arima_model.predict_timeseries(include_history=False) + self.assertEqual(len(forecast_future_pd), self.horizon) + + def test_predict_success(self): + for frequency_quantity, arima_model in self.quantity_model_pairs: + test_df = pd.DataFrame({ + "date": [pd.to_datetime("2020-10-01") + self.num_rows*pd.DateOffset(minutes=frequency_quantity), + pd.to_datetime("2020-10-01") + (self.num_rows+1)*pd.DateOffset(minutes=frequency_quantity)] + }) + expected_test_df = test_df.copy() + yhat = arima_model.predict(context=None, model_input=test_df) + self.assertEqual(2, len(yhat)) + pd.testing.assert_frame_equal(test_df, expected_test_df) # check the input dataframe is unchanged + + def test_predict_success_datetime_date(self): + for _, arima_model in self.quantity_model_pairs: + test_df = pd.DataFrame({ + "date": [datetime.datetime(2020, 10, 1, 6, 0, 0), datetime.datetime(2020, 10, 1, 6, 30, 0)] + }) + expected_test_df = test_df.copy() + yhat = arima_model.predict(context=None, model_input=test_df) + self.assertEqual(2, len(yhat)) + pd.testing.assert_frame_equal(test_df, expected_test_df) # check the input dataframe is unchanged + + def test_predict_success_string(self): + for _, arima_model in self.quantity_model_pairs: + test_df = pd.DataFrame({ + "date": ["2020-10-01 06:00:00", "2020-10-01 06:30:00"] + }) + expected_test_df = test_df.copy() + yhat = arima_model.predict(context=None, model_input=test_df) + self.assertEqual(2, len(yhat)) + pd.testing.assert_frame_equal(test_df, expected_test_df) # check the input dataframe is unchanged + + def test_predict_failure_unmatched_frequency(self): + for frequency_quantity, arima_model in self.quantity_model_pairs: + if frequency_quantity == 1: continue + test_df = pd.DataFrame({ + "date": [pd.to_datetime("2020-10-01 00:00:00"), pd.to_datetime("2020-10-01 00:01:00"), pd.to_datetime("2020-10-01 00:04:00")] + }) + with pytest.raises(MlflowException, match="includes different frequency") as e: + arima_model.predict(context=None, model_input=test_df) + assert e.value.error_code == ErrorCode.Name(INVALID_PARAMETER_VALUE) + + def test_predict_failure_invalid_time_range(self): + for _, arima_model in self.quantity_model_pairs: + test_df = pd.DataFrame({ + "date": [pd.to_datetime("2020-09-30 00:00:00"), pd.to_datetime("2020-10-01 00:01:00")] + }) + with pytest.raises(MlflowException, match="includes time earlier than the history data that the model was " + "trained on") as e: + arima_model.predict(context=None, model_input=test_df) + assert e.value.error_code == ErrorCode.Name(INVALID_PARAMETER_VALUE) + + def test_predict_failure_invalid_time_col_name(self): + for _, arima_model in self.quantity_model_pairs: + test_df = pd.DataFrame({ + "invalid_time_col_name": [pd.to_datetime("2020-10-08"), pd.to_datetime("2020-12-10")] + }) + with pytest.raises(MlflowException, match="Input data columns") as e: + arima_model.predict(context=None, model_input=test_df) + assert e.value.error_code == ErrorCode.Name(INVALID_PARAMETER_VALUE) diff --git a/runtime/tests/automl_runtime/forecast/pmdarima/training_test.py b/runtime/tests/automl_runtime/forecast/pmdarima/training_test.py index 1f1d07a..b79ab96 100644 --- a/runtime/tests/automl_runtime/forecast/pmdarima/training_test.py +++ b/runtime/tests/automl_runtime/forecast/pmdarima/training_test.py @@ -48,13 +48,37 @@ def setUp(self) -> None: pd.Series(np.random.rand(self.num_rows), name="x1"), pd.Series(np.random.rand(self.num_rows), name="x2") ], axis=1) + self.df_with_5_minute_interval = pd.concat([ + pd.Series(pd.date_range(start="2020-07-05 00:00:00", periods=self.num_rows, freq="5min"), name="ds"), + pd.Series(np.random.rand(self.num_rows), name="y"), + ], axis=1) + self.df_with_10_minute_interval = pd.concat([ + pd.Series(pd.date_range(start="2020-07-05 00:00:00", periods=self.num_rows, freq="10min"), name="ds"), + pd.Series(np.random.rand(self.num_rows), name="y"), + ], axis=1) + self.df_with_15_minute_interval = pd.concat([ + pd.Series(pd.date_range(start="2020-07-05 00:00:00", periods=self.num_rows, freq="15min"), name="ds"), + pd.Series(np.random.rand(self.num_rows), name="y"), + ], axis=1) + self.df_with_30_minute_interval = pd.concat([ + pd.Series(pd.date_range(start="2020-07-05 00:00:00", periods=self.num_rows, freq="30min"), name="ds"), + pd.Series(np.random.rand(self.num_rows), name="y"), + ], axis=1) def test_fit_success(self): - for freq, df in [['d', self.df], ['d', self.df_string_time], ['month', self.df_monthly]]: + for freq, frequancy_quantity, df, seasonal_periods in [ + ['d', 1, self.df, [1, 7]], + ['d', 1, self.df_string_time, [1, 7]], + ['month', 1, self.df_monthly, [1, 12]], + ['min', 5, self.df_with_5_minute_interval, [1]], + ['min', 10, self.df_with_10_minute_interval, [1]], + ['min', 15, self.df_with_15_minute_interval, [1]], + ['min', 30, self.df_with_30_minute_interval, [1]]]: arima_estimator = ArimaEstimator(horizon=1, frequency_unit=freq, + frequency_quantity=frequancy_quantity, metric="smape", - seasonal_periods=[1, 7], + seasonal_periods=seasonal_periods, num_folds=2) results_pd = arima_estimator.fit(df) @@ -64,6 +88,7 @@ def test_fit_success(self): def test_fit_success_with_exogenous(self): arima_estimator = ArimaEstimator(horizon=1, frequency_unit="d", + frequency_quantity=1, metric="smape", seasonal_periods=[1, 7], num_folds=2, @@ -74,11 +99,16 @@ def test_fit_success_with_exogenous(self): self.assertIn("pickled_model", results_pd) def test_fit_success_with_split_cutoff(self): - for freq, df, split_cutoff in [['d', self.df, '2020-07-17 00:00:00'], - ['d', self.df_string_time, '2020-07-17 00:00:00'], - ['month', self.df_monthly, '2020-09-07 00:00:00']]: + for freq, frequency_quantity, df, split_cutoff in [['d', 1, self.df, '2020-07-17 00:00:00'], + ['d', 1, self.df_string_time, '2020-07-17 00:00:00'], + ['month', 1, self.df_monthly, '2020-09-07 00:00:00'], + ['min', 5, self.df_with_5_minute_interval, '2020-07-05 00:30:00'], + ['min', 10, self.df_with_10_minute_interval, '2020-07-05 01:00:00'], + ['min', 15, self.df_with_15_minute_interval, '2020-07-05 01:30:00'], + ['min', 30, self.df_with_30_minute_interval, '2020-07-05 03:00:00']]: arima_estimator = ArimaEstimator(horizon=1, frequency_unit=freq, + frequency_quantity=frequency_quantity, metric="smape", seasonal_periods=[1, 7], num_folds=2, @@ -90,18 +120,20 @@ def test_fit_success_with_split_cutoff(self): def test_fit_skip_too_long_seasonality(self): arima_estimator = ArimaEstimator(horizon=1, frequency_unit="d", + frequency_quantity=1, metric="smape", seasonal_periods=[3, 14], num_folds=2) with self.assertLogs(logger="databricks.automl_runtime.forecast.pmdarima.training", level="WARNING") as cm: results_pd = arima_estimator.fit(self.df) - self.assertIn("Skipping seasonal_period=14 (D). Dataframe timestamps must span at least two seasonality periods", cm.output[0]) + self.assertIn("Skipping seasonal_period=14 (1D). Dataframe timestamps must span at least two seasonality periods", cm.output[0]) @patch("databricks.automl_runtime.forecast.prophet.forecast.utils.generate_cutoffs") def test_fit_horizon_truncation(self, mock_generate_cutoffs): period = 2 arima_estimator = ArimaEstimator(horizon=100, frequency_unit="d", + frequency_quantity=1, metric="smape", seasonal_periods=[period], num_folds=2) @@ -119,6 +151,7 @@ def test_fit_horizon_truncation_one_cutoff(self, mock_fit_predict): period = 2 arima_estimator = ArimaEstimator(horizon=100, frequency_unit="d", + frequency_quantity=1, metric="smape", seasonal_periods=[period], num_folds=2) @@ -137,6 +170,7 @@ def test_fit_success_with_failed_seasonal_periods(self): # The fit method still succeeds because m=1 succeeds arima_estimator = ArimaEstimator(horizon=1, frequency_unit="d", + frequency_quantity=1, metric="smape", seasonal_periods=[1, 7, 30], num_folds=2) @@ -147,6 +181,7 @@ def test_fit_success_with_failed_seasonal_periods(self): def test_fit_failure_inconsistent_frequency(self): arima_estimator = ArimaEstimator(horizon=1, frequency_unit="W", + frequency_quantity=1, metric="smape", seasonal_periods=[1], num_folds=2) @@ -156,6 +191,7 @@ def test_fit_failure_inconsistent_frequency(self): def test_fit_failure_no_succeeded_model(self): arima_estimator = ArimaEstimator(horizon=1, frequency_unit="d", + frequency_quantity=1, metric="smape", seasonal_periods=[30], num_folds=2) @@ -182,7 +218,7 @@ def test_fill_missing_time_steps(self): ) indices_to_drop = [5, 8] df_missing = pd.DataFrame({"ds": ds, "y": range(12)}).drop(indices_to_drop).reset_index(drop=True) - df_filled = ArimaEstimator._fill_missing_time_steps(df_missing, frequency=frequency) + df_filled = ArimaEstimator._fill_missing_time_steps(df_missing, frequency_unit=frequency, frequency_quantity=1) for index in indices_to_drop: self.assertTrue(df_filled["y"][index] == df_filled["y"][index - 1]) self.assertEqual(ds.to_list(), df_filled["ds"].to_list()) @@ -196,16 +232,35 @@ def test_fill_missing_time_steps_with_exogenous(self): ) indices_to_drop = [5, 8] df_missing = pd.DataFrame({"ds": ds, "y": range(12), "x": range(12)}).drop(indices_to_drop).reset_index(drop=True) - df_filled = ArimaEstimator._fill_missing_time_steps(df_missing, frequency=frequency) + df_filled = ArimaEstimator._fill_missing_time_steps(df_missing, frequency_unit=frequency, frequency_quantity=1) for index in indices_to_drop: self.assertTrue(df_filled["y"][index] == df_filled["y"][index - 1]) self.assertTrue(df_filled["x"][index] == df_filled["x"][index - 1]) self.assertEqual(ds.to_list(), df_filled["ds"].to_list()) + def test_fill_missing_time_steps_with_multiple_frequency_quantities(self): + supported_quantities = [1, 5, 10, 15, 30] + start_ds = pd.Timestamp("2020-07-05 00:00:00") + for quantity in supported_quantities: + ds = pd.date_range(start=start_ds, periods=12, freq=pd.DateOffset(**{'minutes': quantity})) + indices_to_drop = [5, 8] + df_missing = pd.DataFrame({"ds": ds, "y": range(12)}).drop(indices_to_drop).reset_index(drop=True) + df_filled = ArimaEstimator._fill_missing_time_steps(df_missing, frequency_unit='min', frequency_quantity=quantity) + for index in indices_to_drop: + self.assertTrue(df_filled["y"][index] == df_filled["y"][index - 1]) + self.assertEqual(ds.to_list(), df_filled["ds"].to_list()) + def test_validate_ds_freq_matched_frequency(self): - ArimaEstimator._validate_ds_freq(self.df, frequency='D') - ArimaEstimator._validate_ds_freq(self.df_monthly, frequency='month') + ArimaEstimator._validate_ds_freq(self.df, frequency_unit='D', frequency_quantity=1) + ArimaEstimator._validate_ds_freq(self.df_monthly, frequency_unit='month', frequency_quantity=1) + ArimaEstimator._validate_ds_freq(self.df_with_5_minute_interval, frequency_unit='min', frequency_quantity=5) + ArimaEstimator._validate_ds_freq(self.df_with_10_minute_interval, frequency_unit='min', frequency_quantity=10) + ArimaEstimator._validate_ds_freq(self.df_with_15_minute_interval, frequency_unit='min', frequency_quantity=15) + ArimaEstimator._validate_ds_freq(self.df_with_30_minute_interval, frequency_unit='min', frequency_quantity=30) def test_validate_ds_freq_unmatched_frequency(self): with pytest.raises(ValueError, match="includes different frequency"): - ArimaEstimator._validate_ds_freq(self.df, frequency='W') + ArimaEstimator._validate_ds_freq(self.df, frequency_unit='W', frequency_quantity=1) + + with pytest.raises(ValueError, match="includes different frequency"): + ArimaEstimator._validate_ds_freq(self.df_with_5_minute_interval, frequency_unit='min', frequency_quantity=10) diff --git a/runtime/tests/automl_runtime/forecast/prophet/diagnostics_test.py b/runtime/tests/automl_runtime/forecast/prophet/diagnostics_test.py index 0018f05..7b7b924 100644 --- a/runtime/tests/automl_runtime/forecast/prophet/diagnostics_test.py +++ b/runtime/tests/automl_runtime/forecast/prophet/diagnostics_test.py @@ -43,7 +43,7 @@ def test_cross_validation_success(self): cutoffs = generate_cutoffs( self.X, horizon=3, - unit="MS", + frequency_unit="MS", seasonal_period=1, seasonal_unit="D", num_folds=3, diff --git a/runtime/tests/automl_runtime/forecast/prophet/forecast_test.py b/runtime/tests/automl_runtime/forecast/prophet/forecast_test.py index d3b9478..b1afbe4 100644 --- a/runtime/tests/automl_runtime/forecast/prophet/forecast_test.py +++ b/runtime/tests/automl_runtime/forecast/prophet/forecast_test.py @@ -58,6 +58,22 @@ def setUp(self) -> None: pd.Series(range(self.num_rows), name="ds").apply(lambda i: f"{2012+i:04d}-01-15"), y_series ], axis=1) + self.df_with_5_minute_interval = pd.concat([ + pd.Series(pd.date_range(start="2020-07-05 00:00:00", periods=self.num_rows, freq="5min"), name="ds"), + y_series, + ], axis=1) + self.df_with_10_minute_interval = pd.concat([ + pd.Series(pd.date_range(start="2020-07-05 00:00:00", periods=self.num_rows, freq="10min"), name="ds"), + y_series, + ], axis=1) + self.df_with_15_minute_interval = pd.concat([ + pd.Series(pd.date_range(start="2020-07-05 00:00:00", periods=self.num_rows, freq="15min"), name="ds"), + y_series, + ], axis=1) + self.df_with_30_minute_interval = pd.concat([ + pd.Series(pd.date_range(start="2020-07-05 00:00:00", periods=self.num_rows, freq="30min"), name="ds"), + y_series, + ], axis=1) self.search_space = {"changepoint_prior_scale": hp.loguniform("changepoint_prior_scale", -2.3, -0.7)} def test_sequential_training(self): @@ -116,6 +132,39 @@ def test_monthly_sequential_training(self): model_json = json.loads(results["model_json"][0]) self.assertGreaterEqual(model_json["changepoint_prior_scale"], 0.1) self.assertLessEqual(model_json["changepoint_prior_scale"], 0.5) + + def test_sequential_training_with_multiple_frequency_quantities(self): + search_space = {"changepoint_prior_scale": hp.loguniform("changepoint_prior_scale", -2.3, -0.7)} + search_space["seasonality_mode"] = hp.choice( + 'seasonality_mode', ['additive', 'multiplicative'] + ) + for df, frequency_quantity, frequency_unit in [[self.df_with_5_minute_interval, 5, "min"], + [self.df_with_10_minute_interval, 10, "min"], + [self.df_with_15_minute_interval, 15, "min"], + [self.df_with_30_minute_interval, 30, "min"]]: + hyperopt_estim = ProphetHyperoptEstimator(horizon=1, + frequency_unit=frequency_unit, + frequency_quantity=frequency_quantity, + metric="smape", + interval_width=0.8, + country_holidays="US", + search_space=search_space, + num_folds=2, + trial_timeout=1000, + random_state=0, + is_parallel=False) + results = hyperopt_estim.fit(df) + self.assertAlmostEqual(results["mse"][0], 0, delta=0.0002) + self.assertAlmostEqual(results["rmse"][0], 0, delta=0.02) + self.assertAlmostEqual(results["mae"][0], 0, delta=0.02) + self.assertAlmostEqual(results["mape"][0], 0, delta=0.002) + self.assertAlmostEqual(results["mdape"][0], 0, delta=0.002) + self.assertAlmostEqual(results["smape"][0], 0, delta=0.002) + self.assertGreaterEqual(results["coverage"][0], 0.5) + # check the best result parameter is inside the search space + model_json = json.loads(results["model_json"][0]) + self.assertGreaterEqual(model_json["changepoint_prior_scale"], 0.1) + self.assertLessEqual(model_json["changepoint_prior_scale"], 0.5) def test_training_with_extra_regressors(self): df = pd.concat([ diff --git a/runtime/tests/automl_runtime/forecast/prophet/model_test.py b/runtime/tests/automl_runtime/forecast/prophet/model_test.py index 4ce65da..2699705 100644 --- a/runtime/tests/automl_runtime/forecast/prophet/model_test.py +++ b/runtime/tests/automl_runtime/forecast/prophet/model_test.py @@ -80,7 +80,7 @@ def setUpClass(cls) -> None: cls.model = model_from_json(cls.model_json) def test_model_save_and_load(self): - prophet_model = ProphetModel(self.model_json, 1, "d", "ds") + prophet_model = ProphetModel(self.model_json, 1, "d", 1, "ds") with mlflow.start_run() as run: mlflow_prophet_log_model(prophet_model) @@ -110,7 +110,7 @@ def test_make_future_dataframe(self): # don't have full support yet. if OFFSET_ALIAS_MAP[feq_unit] in ['YS', 'MS', 'QS']: continue - prophet_model = ProphetModel(self.model_json, 1, feq_unit, "ds") + prophet_model = ProphetModel(self.model_json, 1, feq_unit, 1, "ds") future_df = prophet_model.make_future_dataframe(1) offset_kw_arg = DATE_OFFSET_KEYWORD_MAP[OFFSET_ALIAS_MAP[feq_unit]] expected_time = pd.Timestamp("2020-10-25") + pd.DateOffset(**offset_kw_arg) @@ -118,8 +118,18 @@ def test_make_future_dataframe(self): f"Wrong future dataframe generated with frequency {feq_unit}:" f" Expect {expected_time}, but get {future_df.iloc[-1]['ds']}") + def test_make_future_dataframe_with_multiple_frequency_quantities(self): + for frequency_quantity in [1, 5, 10, 15, 30]: + prophet_model = ProphetModel(self.model_json, 1, "min", frequency_quantity, "ds") + future_df = prophet_model.make_future_dataframe(1) + offset_kw_arg = DATE_OFFSET_KEYWORD_MAP[OFFSET_ALIAS_MAP["min"]] + expected_time = pd.Timestamp("2020-10-25") + pd.DateOffset(**offset_kw_arg)*frequency_quantity + self.assertEqual(future_df.iloc[-1]["ds"], expected_time, + f"Wrong future dataframe generated with frequency min:" + f" Expect {expected_time}, but get {future_df.iloc[-1]['ds']}") + def test_predict_success_datetime_date(self): - prophet_model = ProphetModel(self.model_json, 1, "d", "ds") + prophet_model = ProphetModel(self.model_json, 1, "d", 1, "ds") test_df = pd.DataFrame( {"ds": [datetime.date(2020, 10, 8), datetime.date(2020, 12, 10)]} ) @@ -131,7 +141,7 @@ def test_predict_success_datetime_date(self): ) # check the input dataframe is unchanged def test_predict_success_string(self): - prophet_model = ProphetModel(self.model_json, 1, "d", "ds") + prophet_model = ProphetModel(self.model_json, 1, "d", 1, "ds") test_df = pd.DataFrame({"ds": ["2020-10-08", "2020-12-10"]}) expected_test_df = test_df.copy() yhat = prophet_model.predict(None, test_df) @@ -140,8 +150,19 @@ def test_predict_success_string(self): test_df, expected_test_df ) # check the input dataframe is unchanged + def test_predict_multiple_frequency_quantities(self): + for frequency_quantity in [1, 5, 10, 15, 30]: + prophet_model = ProphetModel(self.model_json, 1, "min", frequency_quantity, "ds") + test_df = pd.DataFrame({"ds": ["2020-10-08", "2020-12-10"]}) + expected_test_df = test_df.copy() + yhat = prophet_model.predict(None, test_df) + self.assertEqual(2, len(yhat)) + pd.testing.assert_frame_equal( + test_df, expected_test_df + ) # check the input dataframe is unchanged + def test_validate_predict_cols(self): - prophet_model = ProphetModel(self.model_json, 1, "d", "time") + prophet_model = ProphetModel(self.model_json, 1, "d", 1, "time") test_df = pd.DataFrame( { "date": [pd.to_datetime("2020-11-01"), pd.to_datetime("2020-11-04")], @@ -173,7 +194,8 @@ def setUpClass(cls) -> None: timeseries_starts=cls.multi_series_start, timeseries_end="2020-07-25", horizon=1, - frequency="days", + frequency_unit="days", + frequency_quantity=1, time_col="time", id_cols=["id"], ) @@ -241,6 +263,7 @@ def test_model_save_and_load_multi_ids(self): "2020-07-25", 1, "days", + 1, "time", ["id1", "id2"], ) @@ -302,7 +325,8 @@ def test_validate_predict_cols(self): timeseries_starts=self.multi_series_start, timeseries_end="2020-07-25", horizon=1, - frequency="days", + frequency_unit="days", + frequency_quantity=1, time_col="ds", id_cols=["id1"], ) @@ -338,6 +362,23 @@ def test_make_future_dataframe(self): self.assertCountEqual(future_df.columns, {"ds", "id"}) self.assertEqual(2, future_df.shape[0]) + def test_make_future_dataframe_multiple_frequency_quantities(self): + + for frequency_quantity in [1, 5, 10, 15, 30]: + prophet_model = MultiSeriesProphetModel( + model_json=self.multi_series_model_json, + timeseries_starts=self.multi_series_start, + timeseries_end="2020-07-25", + horizon=1, + frequency_unit="min", + frequency_quantity=frequency_quantity, + time_col="time", + id_cols=["id"], + ) + future_df = prophet_model.make_future_dataframe(include_history=False) + self.assertCountEqual(future_df.columns, {"ds", "id"}) + self.assertEqual(2, future_df.shape[0]) + def test_make_future_dataframe_multi_ids(self): multi_series_model_json = {(1, "1"): self.model_json, (2, "1"): self.model_json} multi_series_start = { @@ -350,6 +391,7 @@ def test_make_future_dataframe_multi_ids(self): "2020-07-25", 1, "days", + 1, "time", ["id1", "id2"], ) diff --git a/runtime/tests/automl_runtime/forecast/utils_test.py b/runtime/tests/automl_runtime/forecast/utils_test.py index 3d9c519..c043da9 100644 --- a/runtime/tests/automl_runtime/forecast/utils_test.py +++ b/runtime/tests/automl_runtime/forecast/utils_test.py @@ -131,11 +131,11 @@ def setUp(self) -> None: ).rename_axis("y").reset_index() def test_generate_cutoffs_success(self): - cutoffs = generate_cutoffs(self.X, horizon=7, unit="D", num_folds=3, seasonal_period=7) + cutoffs = generate_cutoffs(self.X, horizon=7, frequency_unit="D", num_folds=3, seasonal_period=7) self.assertEqual([pd.Timestamp('2020-08-16 00:00:00'), pd.Timestamp('2020-08-19 12:00:00'), pd.Timestamp('2020-08-23 00:00:00')], cutoffs) def test_generate_cutoffs_success_large_num_folds(self): - cutoffs = generate_cutoffs(self.X, horizon=7, unit="D", num_folds=20, seasonal_period=1) + cutoffs = generate_cutoffs(self.X, horizon=7, frequency_unit="D", num_folds=20, seasonal_period=1) self.assertEqual([pd.Timestamp('2020-07-22 12:00:00'), pd.Timestamp('2020-07-26 00:00:00'), pd.Timestamp('2020-07-29 12:00:00'), @@ -151,7 +151,7 @@ def test_generate_cutoffs_success_with_gaps(self): df = pd.DataFrame( pd.date_range(start="2020-07-01", periods=30, freq='3d'), columns=["ds"] ).rename_axis("y").reset_index() - cutoffs = generate_cutoffs(df, horizon=1, unit="D", num_folds=5, seasonal_period=1) + cutoffs = generate_cutoffs(df, horizon=1, frequency_unit="D", num_folds=5, seasonal_period=1) self.assertEqual([pd.Timestamp('2020-09-13 00:00:00'), pd.Timestamp('2020-09-16 00:00:00'), pd.Timestamp('2020-09-19 00:00:00'), @@ -167,10 +167,10 @@ def test_generate_cutoffs_success_hourly(self): pd.Timestamp('2020-07-07 11:00:00'), pd.Timestamp('2020-07-07 14:00:00'), pd.Timestamp('2020-07-07 17:00:00')] - cutoffs = generate_cutoffs(df, horizon=6, unit="H", num_folds=5, seasonal_period=24) + cutoffs = generate_cutoffs(df, horizon=6, frequency_unit="H", num_folds=5, seasonal_period=24) self.assertEqual(expected_cutoffs, cutoffs) - cutoffs_different_seasonal_unit = generate_cutoffs(df, horizon=6, unit="H", num_folds=5, + cutoffs_different_seasonal_unit = generate_cutoffs(df, horizon=6, frequency_unit="H", num_folds=5, seasonal_period=1, seasonal_unit="D") self.assertEqual(expected_cutoffs, cutoffs_different_seasonal_unit) @@ -178,39 +178,63 @@ def test_generate_cutoffs_success_weekly(self): df = pd.DataFrame( pd.date_range(start="2020-07-01", periods=52, freq='W'), columns=["ds"] ).rename_axis("y").reset_index() - cutoffs = generate_cutoffs(df, horizon=4, unit="W", num_folds=3, seasonal_period=1) + cutoffs = generate_cutoffs(df, horizon=4, frequency_unit="W", num_folds=3, seasonal_period=1) self.assertEqual([pd.Timestamp('2021-05-02 00:00:00'), pd.Timestamp('2021-05-16 00:00:00'), pd.Timestamp('2021-05-30 00:00:00')], cutoffs) def test_generate_cutoffs_failure_horizon_too_large(self): with self.assertRaisesRegex(ValueError, "Less data than horizon after initial window. " "Make horizon shorter."): - generate_cutoffs(self.X, horizon=20, unit="D", num_folds=3, seasonal_period=1) + generate_cutoffs(self.X, horizon=20, frequency_unit="D", num_folds=3, seasonal_period=1) def test_generate_cutoffs_less_data(self): with self.assertRaisesRegex(ValueError, "Less data than horizon."): - generate_cutoffs(self.X, horizon=100, unit="D", num_folds=3, seasonal_period=1) + generate_cutoffs(self.X, horizon=100, frequency_unit="D", num_folds=3, seasonal_period=1) def test_generate_cutoffs_success_monthly(self): df = pd.DataFrame( pd.date_range(start="2020-01-12", periods=24, freq=pd.DateOffset(months=1)), columns=["ds"] ).rename_axis("y").reset_index() - cutoffs = generate_cutoffs(df, horizon=2, unit="MS", num_folds=3, seasonal_period=1) + cutoffs = generate_cutoffs(df, horizon=2, frequency_unit="MS", num_folds=3, seasonal_period=1) self.assertEqual([pd.Timestamp('2021-08-12 00:00:00'), pd.Timestamp('2021-9-12 00:00:00'), pd.Timestamp('2021-10-12 00:00:00')], cutoffs) def test_generate_cutoffs_success_quaterly(self): df = pd.DataFrame( pd.date_range(start="2020-07-12", periods=9, freq=pd.DateOffset(months=3)), columns=["ds"] ).rename_axis("y").reset_index() - cutoffs = generate_cutoffs(df, horizon=1, unit="QS", num_folds=3, seasonal_period=1) + cutoffs = generate_cutoffs(df, horizon=1, frequency_unit="QS", num_folds=3, seasonal_period=1) self.assertEqual([pd.Timestamp('2021-10-12 00:00:00'), pd.Timestamp('2022-01-12 00:00:00'), pd.Timestamp('2022-04-12 00:00:00')], cutoffs) def test_generate_cutoffs_success_annualy(self): df = pd.DataFrame( pd.date_range(start="2012-07-14", periods=10, freq=pd.DateOffset(years=1)), columns=["ds"] ).rename_axis("y").reset_index() - cutoffs = generate_cutoffs(df, horizon=1, unit="YS", num_folds=3, seasonal_period=1) + cutoffs = generate_cutoffs(df, horizon=1, frequency_unit="YS", num_folds=3, seasonal_period=1) self.assertEqual([pd.Timestamp('2018-07-14 00:00:00'), pd.Timestamp('2019-07-14 00:00:00'), pd.Timestamp('2020-07-14 00:00:00')], cutoffs) + def test_generate_cutoffs_success_with_multiple_frequency_quantities(self): + df = pd.DataFrame( + pd.date_range(start="2020-07-01 00:00:00", end="2020-07-01 23:55:00", freq='5T'), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_cutoffs(df, horizon=1, frequency_unit="min", num_folds=3, seasonal_period=1) + self.assertEqual([pd.Timestamp('2020-07-01 23:44:00'), pd.Timestamp('2020-07-01 23:49:00'), pd.Timestamp('2020-07-01 23:54:00')], cutoffs) + + df = pd.DataFrame( + pd.date_range(start="2020-07-01 00:00:00", end="2020-07-01 23:50:00", freq='10T'), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_cutoffs(df, horizon=1, frequency_unit="min", num_folds=3, seasonal_period=1) + self.assertEqual([pd.Timestamp('2020-07-01 23:29:00'), pd.Timestamp('2020-07-01 23:39:00'), pd.Timestamp('2020-07-01 23:49:00')], cutoffs) + + df = pd.DataFrame( + pd.date_range(start="2020-07-01 00:00:00", end="2020-07-01 23:45:00", freq='15T'), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_cutoffs(df, horizon=1, frequency_unit="min", num_folds=3, seasonal_period=1) + self.assertEqual([pd.Timestamp('2020-07-01 23:14:00'), pd.Timestamp('2020-07-01 23:29:00'), pd.Timestamp('2020-07-01 23:44:00')], cutoffs) + + df = pd.DataFrame( + pd.date_range(start="2020-07-01 00:00:00", end="2020-07-01 23:30:00", freq='30T'), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_cutoffs(df, horizon=1, frequency_unit="min", num_folds=3, seasonal_period=1) + self.assertEqual([pd.Timestamp('2020-07-01 22:29:00'), pd.Timestamp('2020-07-01 22:59:00'), pd.Timestamp('2020-07-01 23:29:00')], cutoffs) class TestTestGenerateCustomCutoffs(unittest.TestCase): @@ -222,56 +246,81 @@ def test_generate_custom_cutoffs_success_hourly(self): pd.Timestamp('2020-07-07 14:00:00'), pd.Timestamp('2020-07-07 15:00:00'), pd.Timestamp('2020-07-07 16:00:00')] - cutoffs = generate_custom_cutoffs(df, horizon=7, unit="H", split_cutoff=pd.Timestamp('2020-07-07 13:00:00')) + cutoffs = generate_custom_cutoffs(df, horizon=7, frequency_unit="H", split_cutoff=pd.Timestamp('2020-07-07 13:00:00')) self.assertEqual(expected_cutoffs, cutoffs) def test_generate_custom_cutoffs_success_daily(self): df = pd.DataFrame( pd.date_range(start="2020-07-01", end="2020-08-30", freq='d'), columns=["ds"] ).rename_axis("y").reset_index() - cutoffs = generate_custom_cutoffs(df, horizon=7, unit="D", split_cutoff=pd.Timestamp('2020-08-21 00:00:00')) + cutoffs = generate_custom_cutoffs(df, horizon=7, frequency_unit="D", split_cutoff=pd.Timestamp('2020-08-21 00:00:00')) self.assertEqual([pd.Timestamp('2020-08-21 00:00:00'), pd.Timestamp('2020-08-22 00:00:00'), pd.Timestamp('2020-08-23 00:00:00')], cutoffs) def test_generate_custom_cutoffs_success_small_horizon(self): df = pd.DataFrame( pd.date_range(start="2020-07-01", end="2020-08-30", freq='2d'), columns=["ds"] ).rename_axis("y").reset_index() - cutoffs = generate_custom_cutoffs(df, horizon=1, unit="D", split_cutoff=pd.Timestamp('2020-08-26 00:00:00')) + cutoffs = generate_custom_cutoffs(df, horizon=1, frequency_unit="D", split_cutoff=pd.Timestamp('2020-08-26 00:00:00')) self.assertEqual([pd.Timestamp('2020-08-27 00:00:00'), pd.Timestamp('2020-08-29 00:00:00')], cutoffs) def test_generate_custom_cutoffs_success_weekly(self): df = pd.DataFrame( pd.date_range(start="2020-07-01", periods=52, freq='W'), columns=["ds"] ).rename_axis("y").reset_index() - cutoffs = generate_custom_cutoffs(df, horizon=7, unit="W", split_cutoff=pd.Timestamp('2021-04-25 00:00:00')) + cutoffs = generate_custom_cutoffs(df, horizon=7, frequency_unit="W", split_cutoff=pd.Timestamp('2021-04-25 00:00:00')) self.assertEqual([pd.Timestamp('2021-04-25 00:00:00'), pd.Timestamp('2021-05-02 00:00:00'), pd.Timestamp('2021-05-09 00:00:00')], cutoffs) def test_generate_custom_cutoffs_success_monthly(self): df = pd.DataFrame( pd.date_range(start="2020-01-12", periods=24, freq=pd.DateOffset(months=1)), columns=["ds"] ).rename_axis("y").reset_index() - cutoffs = generate_custom_cutoffs(df, horizon=7, unit="MS", split_cutoff=pd.Timestamp('2021-03-12 00:00:00')) + cutoffs = generate_custom_cutoffs(df, horizon=7, frequency_unit="MS", split_cutoff=pd.Timestamp('2021-03-12 00:00:00')) self.assertEqual([pd.Timestamp('2021-03-12 00:00:00'), pd.Timestamp('2021-04-12 00:00:00'), pd.Timestamp('2021-05-12 00:00:00')], cutoffs) def test_generate_custom_cutoffs_success_quaterly(self): df = pd.DataFrame( pd.date_range(start="2020-07-12", periods=9, freq=pd.DateOffset(months=3)), columns=["ds"] ).rename_axis("y").reset_index() - cutoffs = generate_custom_cutoffs(df, horizon=7, unit="QS", split_cutoff=pd.Timestamp('2020-07-12 00:00:00')) + cutoffs = generate_custom_cutoffs(df, horizon=7, frequency_unit="QS", split_cutoff=pd.Timestamp('2020-07-12 00:00:00')) self.assertEqual([pd.Timestamp('2020-07-12 00:00:00'), pd.Timestamp('2020-10-12 00:00:00')], cutoffs) def test_generate_custom_cutoffs_success_annualy(self): df = pd.DataFrame( pd.date_range(start="2012-07-14", periods=10, freq=pd.DateOffset(years=1)), columns=["ds"] ).rename_axis("y").reset_index() - cutoffs = generate_custom_cutoffs(df, horizon=7, unit="YS", split_cutoff=pd.Timestamp('2012-07-14 00:00:00')) + cutoffs = generate_custom_cutoffs(df, horizon=7, frequency_unit="YS", split_cutoff=pd.Timestamp('2012-07-14 00:00:00')) self.assertEqual([pd.Timestamp('2012-07-14 00:00:00'), pd.Timestamp('2013-07-14 00:00:00'), pd.Timestamp('2014-07-14 00:00:00')], cutoffs) + def test_generate_custom_cutoffs_success_with_multiple_frequency_quantities(self): + df = pd.DataFrame( + pd.date_range(start="2020-07-01 00:00:00", end="2020-07-01 23:55:00", freq='5T'), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_custom_cutoffs(df, horizon=1, frequency_unit="min", frequency_quantity=5, split_cutoff=pd.Timestamp('2020-07-01 23:45:00')) + self.assertEqual([pd.Timestamp('2020-07-01 23:45:00'), pd.Timestamp('2020-07-01 23:50:00')], cutoffs) + + df = pd.DataFrame( + pd.date_range(start="2020-07-01 00:00:00", end="2020-07-01 23:50:00", freq='10T'), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_custom_cutoffs(df, horizon=1, frequency_unit="min", frequency_quantity=10, split_cutoff=pd.Timestamp('2020-07-01 23:30:00')) + self.assertEqual([pd.Timestamp('2020-07-01 23:30:00'), pd.Timestamp('2020-07-01 23:40:00')], cutoffs) + + df = pd.DataFrame( + pd.date_range(start="2020-07-01 00:00:00", end="2020-07-01 23:45:00", freq='15T'), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_custom_cutoffs(df, horizon=1, frequency_unit="min", frequency_quantity=15, split_cutoff=pd.Timestamp('2020-07-01 23:15:00')) + self.assertEqual([pd.Timestamp('2020-07-01 23:15:00'), pd.Timestamp('2020-07-01 23:30:00')], cutoffs) + + df = pd.DataFrame( + pd.date_range(start="2020-07-01 00:00:00", end="2020-07-01 23:30:00", freq='30T'), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_custom_cutoffs(df, horizon=1, frequency_unit="min", frequency_quantity=30, split_cutoff=pd.Timestamp('2020-07-01 23:00:00')) + self.assertEqual([pd.Timestamp('2020-07-01 23:00:00')], cutoffs) + def test_generate_custom_cutoffs_success_with_small_gaps(self): df = pd.DataFrame( pd.date_range(start="2020-07-01", periods=30, freq='3d'), columns=["ds"] ).rename_axis("y").reset_index() - cutoffs = generate_custom_cutoffs(df, horizon=7, unit="D", split_cutoff=pd.Timestamp('2020-09-17 00:00:00')) + cutoffs = generate_custom_cutoffs(df, horizon=7, frequency_unit="D", split_cutoff=pd.Timestamp('2020-09-17 00:00:00')) self.assertEqual([pd.Timestamp('2020-09-17 00:00:00'), pd.Timestamp('2020-09-18 00:00:00'), pd.Timestamp('2020-09-19 00:00:00')], cutoffs) @@ -280,7 +329,7 @@ def test_generate_custom_cutoffs_success_with_large_gaps(self): df = pd.DataFrame( pd.date_range(start="2020-07-01", periods=30, freq='9d'), columns=["ds"] ).rename_axis("y").reset_index() - cutoffs = generate_custom_cutoffs(df, horizon=7, unit="D", split_cutoff=pd.Timestamp('2021-03-08 00:00:00')) + cutoffs = generate_custom_cutoffs(df, horizon=7, frequency_unit="D", split_cutoff=pd.Timestamp('2021-03-08 00:00:00')) self.assertEqual([pd.Timestamp('2021-03-08 00:00:00'), pd.Timestamp('2021-03-09 00:00:00'), pd.Timestamp('2021-03-12 00:00:00')], cutoffs) @@ -300,7 +349,7 @@ def test_calculate_period_differences_evenly(self): ) }) periods = df.apply(lambda x: calculate_period_differences( - x.start_time, x.end_time, 'month' + x.start_time, x.end_time, 'month', 1 ), axis=1) self.assertTrue((periods == pd.Series([4, 5, 12])).all()) @@ -314,13 +363,24 @@ def test_calculate_period_differences_unevenly(self): ) }) periods = df.apply(lambda x: calculate_period_differences( - x.start_time, x.end_time, 'month' + x.start_time, x.end_time, 'month', 1 ), axis=1) self.assertTrue((periods == pd.Series([4, 5, 0])).all()) periods = df.apply(lambda x: calculate_period_differences( - x.start_time, x.end_time, 'day' + x.start_time, x.end_time, 'day', 1 ), axis=1) self.assertTrue((periods == pd.Series([118, 151, 0])).all()) + + def test_calculate_period_differences_with_frequency_quantity(self): + for frequency_quantity in [1, 5, 10, 15, 30]: + df = pd.DataFrame({ + 'start_time': pd.date_range(start="2020-07-01 00:00:00", periods=10, freq=f'{frequency_quantity}T'), + 'end_time': pd.date_range(start="2020-07-01 04:00:00", periods=10, freq=f'{frequency_quantity}T') + }) + periods = df.apply(lambda x: calculate_period_differences( + x.start_time, x.end_time, 'min', frequency_quantity + ), axis=1) + self.assertTrue((periods == pd.Series([240//frequency_quantity]*10)).all()) def test_frequency_consistency(self): start_time = pd.Series( @@ -331,14 +391,25 @@ def test_frequency_consistency(self): ) start_scalar = pd.to_datetime('2021-01-14') end_scalar = pd.to_datetime('2021-05-16') - self.assertFalse(is_frequency_consistency(start_scalar, end_scalar, 'month')) + self.assertFalse(is_frequency_consistency(start_scalar, end_scalar, 'month', 1)) self.assertTrue(start_time.apply( - lambda x: is_frequency_consistency(x, end_scalar, 'day') + lambda x: is_frequency_consistency(x, end_scalar, 'day', 1) ).all()) self.assertTrue(end_time.apply( - lambda x: is_frequency_consistency(start_scalar, x, 'month') + lambda x: is_frequency_consistency(start_scalar, x, 'month', 1) ).all()) + def test_frequency_consistency_with_frequency_quantity(self): + for frequency_quantity in [1, 5, 10, 15, 30]: + start_time = pd.date_range(start="2020-07-01 00:00:00", periods=10, freq=f'{frequency_quantity}T') + end_time = pd.date_range(start="2020-07-01 04:00:00", periods=10, freq=f'{frequency_quantity}T') + self.assertTrue(start_time.to_series().apply( + lambda x: is_frequency_consistency(x, end_time[0], 'min', frequency_quantity) + ).all()) + self.assertTrue(end_time.to_series().apply( + lambda x: is_frequency_consistency(start_time[0], x, 'min', frequency_quantity) + ).all()) + class TestMakeFutureDataFrame(unittest.TestCase): def test_make_single_future_dataframe(self): @@ -346,7 +417,8 @@ def test_make_single_future_dataframe(self): start_time=pd.to_datetime('2022-01-01'), end_time=pd.to_datetime('2022-01-04'), horizon=1, - frequency="d", + frequency_unit="d", + frequency_quantity=1, include_history=False, column_name="test_date" ) @@ -358,7 +430,8 @@ def test_make_single_future_dataframe(self): start_time=pd.to_datetime('2022-01-01'), end_time=pd.to_datetime('2022-01-04'), horizon=1, - frequency="d", + frequency_unit="d", + frequency_quantity=1, include_history=True, column_name="test_date" ) @@ -374,7 +447,25 @@ def test_make_single_future_dataframe_with_different_freq(self): start_time=start_time, end_time=end_time, horizon=1, - frequency=freq, + frequency_unit=freq, + frequency_quantity=1, + include_history=True, + column_name="test_date" + ) + self.assertEqual(len(future_df), 3) + expected_columns = { "test_date" } + self.assertTrue(expected_columns.issubset(set(future_df.columns))) + + def test_make_single_future_dataframe_with_different_frequency_quantities(self): + for frequency_quantity in [1, 5, 10, 15, 30]: + start_time = pd.to_datetime('2022-01-01') + end_time = start_time + pd.DateOffset(**{'minutes': 1}) * frequency_quantity + future_df = make_single_future_dataframe( + start_time=start_time, + end_time=end_time, + horizon=1, + frequency_unit="min", + frequency_quantity=frequency_quantity, include_history=True, column_name="test_date" ) @@ -383,24 +474,31 @@ def test_make_single_future_dataframe_with_different_freq(self): self.assertTrue(expected_columns.issubset(set(future_df.columns))) @parameterized.expand([ - (pd.to_datetime('2022-01-01'), pd.to_datetime('2022-01-05'), None, None, { "ds" }, ), - (pd.to_datetime('2022-01-01'), pd.to_datetime('2022-01-05'), ["id"], [("1", )], { "ds", "id" }), + (pd.to_datetime('2022-01-01'), pd.to_datetime('2022-01-05'), None, None, { "ds" }, "d", 1), + (pd.to_datetime('2022-01-01'), pd.to_datetime('2022-01-05'), ["id"], [("1", )], { "ds", "id" }, "d", 1), (pd.to_datetime('2022-01-01'), pd.to_datetime('2022-01-05'), ["id1", "id2"], - [("1", 1)], { "ds", "id1", "id2" }), + [("1", 1)], { "ds", "id1", "id2" }, "d", 1), ({("1", ): pd.to_datetime('2022-01-01'), ("2", ): pd.to_datetime('2022-01-01')}, - pd.to_datetime('2022-01-02'), ["id"], [("1", ), ("2", )], { "ds", "id" }), + pd.to_datetime('2022-01-02'), ["id"], [("1", ), ("2", )], { "ds", "id" }, "d", 1), ({("1", ): pd.to_datetime('2022-01-01'), ("2", ): pd.to_datetime('2022-01-01')}, {("1", ): pd.to_datetime('2022-01-02'), ("2", ): pd.to_datetime('2022-01-02')}, - ["id"], [("1", ), ("2", )], { "ds", "id" }), + ["id"], [("1", ), ("2", )], { "ds", "id" }, "d", 1), + (pd.Timestamp('2022-01-01 00:00:00'), pd.Timestamp('2022-01-01 00:20:00'), None, None, { "ds" }, "min", 5), + (pd.Timestamp('2022-01-01 00:00:00'), pd.Timestamp('2022-01-01 00:40:00'), None, None, { "ds" }, "min", 10), + (pd.Timestamp('2022-01-01 00:00:00'), pd.Timestamp('2022-01-01 01:00:00'), None, None, { "ds" }, "min", 15), + (pd.Timestamp('2022-01-01 00:00:00'), pd.Timestamp('2022-01-01 02:00:00'), None, None, { "ds" }, "min", 30), ]) def test_make_future_dataframe(self, start_time, end_time, identity_column_names, - groups, expected_columns): + groups, expected_columns, + frequency_unit, + frequency_quantity): future_df = make_future_dataframe( start_time=start_time, end_time=end_time, horizon=1, - frequency="d", + frequency_unit=frequency_unit, + frequency_quantity=frequency_quantity, groups=groups, identity_column_names=identity_column_names, )