diff --git a/runtime/databricks/automl_runtime/forecast/deepar/__init__.py b/runtime/databricks/automl_runtime/forecast/deepar/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/runtime/databricks/automl_runtime/forecast/deepar/model.py b/runtime/databricks/automl_runtime/forecast/deepar/model.py new file mode 100644 index 00000000..92636406 --- /dev/null +++ b/runtime/databricks/automl_runtime/forecast/deepar/model.py @@ -0,0 +1,149 @@ +# +# Copyright (C) 2024 Databricks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from typing import List, Optional + +import gluonts +import mlflow +import pandas as pd +from gluonts.dataset.pandas import PandasDataset +from gluonts.torch.model.predictor import PyTorchPredictor +from mlflow.utils.environment import _mlflow_conda_env + +from databricks.automl_runtime import version +from databricks.automl_runtime.forecast.model import ForecastModel, mlflow_forecast_log_model +from databricks.automl_runtime.forecast.deepar.utils import set_index_and_fill_missing_time_steps + +DEEPAR_ADDITIONAL_PIP_DEPS = [ + f"gluonts[torch]=={gluonts.__version__}", + f"pandas=={pd.__version__}", + f"databricks-automl-runtime=={version.__version__}" +] + +DEEPAR_CONDA_ENV = _mlflow_conda_env( + additional_pip_deps=DEEPAR_ADDITIONAL_PIP_DEPS +) + + +class DeepARModel(ForecastModel): + """ + DeepAR mlflow model wrapper for forecasting. + """ + + def __init__(self, model: PyTorchPredictor, horizon: int, frequency: str, + num_samples: int, + target_col: str, time_col: str, + id_cols: Optional[List[str]] = None) -> None: + """ + Initialize the DeepAR mlflow Python model wrapper + :param model: DeepAR model + :param horizon: the number of periods to forecast forward + :param frequency: the frequency of the time series + :param num_samples: the number of samples to draw from the distribution + :param target_col: the target column name + :param time_col: the time column name + :param id_cols: the column names of the identity columns for multi-series time series; None for single series + """ + + super().__init__() + self._model = model + self._horizon = horizon + self._frequency = frequency + self._num_samples = num_samples + self._target_col = target_col + self._time_col = time_col + self._id_cols = id_cols + + @property + def model_env(self): + return DEEPAR_CONDA_ENV + + def predict(self, + context: mlflow.pyfunc.model.PythonModelContext, + model_input: pd.DataFrame) -> pd.DataFrame: + """ + Predict the future dataframe given the history dataframe + :param context: A :class:`~PythonModelContext` instance containing artifacts that the model + can use to perform inference. + :param model_input: Input dataframe that contains the history data + :return: predicted pd.DataFrame that starts after the last timestamp in the input dataframe, + and predicts the horizon using the mean of the samples + """ + required_cols = [self._target_col, self._time_col] + if self._id_cols: + required_cols += self._id_cols + self._validate_cols(model_input, required_cols) + + forecast_sample_list = self.predict_samples(model_input, num_samples=self._num_samples) + + pred_df = pd.concat( + [ + forecast.mean_ts.rename('yhat').reset_index().assign(item_id=forecast.item_id) + for forecast in forecast_sample_list + ], + ignore_index=True + ) + + pred_df = pred_df.rename(columns={'index': self._time_col}) + if self._id_cols: + id_col_name = '-'.join(self._id_cols) + pred_df = pred_df.rename(columns={'item_id': id_col_name}) + else: + pred_df = pred_df.drop(columns='item_id') + + pred_df[self._time_col] = pred_df[self._time_col].dt.to_timestamp() + + return pred_df + + def predict_samples(self, + model_input: pd.DataFrame, + num_samples: int = None) -> List[gluonts.model.forecast.SampleForecast]: + """ + Predict the future samples given the history dataframe + :param model_input: Input dataframe that contains the history data + :param num_samples: the number of samples to draw from the distribution + :return: List of SampleForecast, where each SampleForecast contains num_samples sampled forecasts + """ + if num_samples is None: + num_samples = self._num_samples + + # Group by the time column in case there are multiple rows for each time column, + # for example, the user didn't provide all the identity columns for a multi-series dataset + group_cols = [self._time_col] + if self._id_cols: + group_cols += self._id_cols + model_input = model_input.groupby(group_cols).agg({self._target_col: "mean"}).reset_index() + + model_input_transformed = set_index_and_fill_missing_time_steps(model_input, + self._time_col, + self._frequency, + self._id_cols) + + test_ds = PandasDataset(model_input_transformed, target=self._target_col) + + forecast_iter = self._model.predict(test_ds, num_samples=num_samples) + forecast_sample_list = list(forecast_iter) + + return forecast_sample_list + + +def mlflow_deepar_log_model(deepar_model: DeepARModel, + sample_input: pd.DataFrame = None) -> None: + """ + Log the DeepAR model to mlflow + :param deepar_model: DeepAR mlflow PythonModel wrapper + :param sample_input: sample input Dataframes for model inference + """ + mlflow_forecast_log_model(deepar_model, sample_input) diff --git a/runtime/databricks/automl_runtime/forecast/deepar/utils.py b/runtime/databricks/automl_runtime/forecast/deepar/utils.py new file mode 100644 index 00000000..966f180c --- /dev/null +++ b/runtime/databricks/automl_runtime/forecast/deepar/utils.py @@ -0,0 +1,108 @@ +# +# Copyright (C) 2024 Databricks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from typing import List, Optional + +import pandas as pd + + +def validate_and_generate_index(df: pd.DataFrame, time_col: str, frequency: str): + """ + Generate a complete time index for the given DataFrame based on the specified frequency. + + - Ensures the time column is in datetime format. + - Validates consistency in the day of the month if frequency is "MS" (month start). + - Generates a new time index from the minimum to the maximum timestamp in the data. + + :param df: The input DataFrame containing the time column. + :param time_col: The name of the time column. + :param frequency: The frequency of the time series. + :return: A complete time index covering the full range of the dataset. + :raises ValueError: If the day-of-month pattern is inconsistent for "MS" frequency. + """ + if frequency.upper() != "MS": + return pd.date_range(df[time_col].min(), df[time_col].max(), freq=frequency) + + df[time_col] = pd.to_datetime(df[time_col]) # Ensure datetime format + + # Extract unique days + unique_days = df[time_col].dt.day.unique() + + if len(unique_days) == 1: + # All dates have the same day-of-month, considered consistent + day_of_month = unique_days[0] + else: + # Check if all dates are last days of their respective months + is_last_day = (df[time_col] + pd.offsets.MonthEnd(0)) == df[time_col] + if is_last_day.all(): + day_of_month = "MonthEnd" + else: + raise ValueError("Inconsistent day of the month found in time column.") + + # Generate new index based on detected pattern + total_min, total_max = df[time_col].min(), df[time_col].max() + month_starts = pd.date_range(start=total_min.to_period("M").to_timestamp(), + end=total_max.to_period("M").to_timestamp(), + freq="MS") + + if day_of_month == "MonthEnd": + new_index_full = month_starts + pd.offsets.MonthEnd(0) + else: + new_index_full = month_starts.map(lambda d: d.replace(day=day_of_month)) + + return new_index_full + +def set_index_and_fill_missing_time_steps(df: pd.DataFrame, time_col: str, + frequency: str, + id_cols: Optional[List[str]] = None): + """ + Transform the input dataframe to an acceptable format for the GluonTS library. + + - Set the time column as the index + - Impute missing time steps between the min and max time steps + + :param df: the input dataframe that contains time_col + :param time_col: time column name + :param frequency: the frequency of the time series + :param id_cols: the column names of the identity columns for multi-series time series; None for single series + :return: single-series - transformed dataframe; + multi-series - dictionary of transformed dataframes, each key is the (concatenated) id of the time series + """ + total_min, total_max = df[time_col].min(), df[time_col].max() + + # We need to adjust the frequency for pd.date_range if it is weekly, + # otherwise it would always be "W-SUN" + if frequency.upper() == "W": + weekday_name = total_min.strftime("%a").upper() # e.g., "FRI" + frequency = f"W-{weekday_name}" + + new_index_full = validate_and_generate_index(df=df, time_col=time_col, frequency=frequency) + + if id_cols is not None: + df_dict = {} + for grouped_id, grouped_df in df.groupby(id_cols): + if isinstance(grouped_id, tuple): + ts_id = "-".join([str(x) for x in grouped_id]) + else: + ts_id = str(grouped_id) + df_dict[ts_id] = (grouped_df.set_index(time_col).sort_index() + .reindex(new_index_full).drop(id_cols, axis=1)) + + return df_dict + + df = df.set_index(time_col).sort_index() + + # Fill in missing time steps between the min and max time steps + return df.reindex(new_index_full) diff --git a/runtime/databricks/automl_runtime/forecast/model.py b/runtime/databricks/automl_runtime/forecast/model.py index 2ba5fe66..4b1aa25c 100644 --- a/runtime/databricks/automl_runtime/forecast/model.py +++ b/runtime/databricks/automl_runtime/forecast/model.py @@ -57,10 +57,14 @@ def mlflow_forecast_log_model(forecast_model: ForecastModel, :param forecast_model: Forecast model wrapper :param sample_input: sample input Dataframes for model inference """ - # log the model without signature if infer_signature is failed. + # TODO: [ML-46185] we should not be logging without a signature since it cannot be registered to UC then try: signature = forecast_model.infer_signature(sample_input) except Exception: # noqa signature = None - mlflow.pyfunc.log_model("model", conda_env=forecast_model.model_env, - python_model=forecast_model, signature=signature) + mlflow.pyfunc.log_model( + artifact_path="model", + conda_env=forecast_model.model_env, + python_model=forecast_model, + signature=signature + ) diff --git a/runtime/databricks/automl_runtime/forecast/pmdarima/model.py b/runtime/databricks/automl_runtime/forecast/pmdarima/model.py index c99d5d01..64e5377d 100644 --- a/runtime/databricks/automl_runtime/forecast/pmdarima/model.py +++ b/runtime/databricks/automl_runtime/forecast/pmdarima/model.py @@ -28,13 +28,17 @@ from databricks.automl_runtime.forecast.model import ForecastModel, mlflow_forecast_log_model from databricks.automl_runtime.forecast.utils import calculate_period_differences, is_frequency_consistency, \ make_future_dataframe, make_single_future_dataframe +from databricks.automl_runtime import version +ARIMA_ADDITIONAL_PIP_DEPS = [ + f"pmdarima=={pmdarima.__version__}", + f"pandas=={pd.__version__}", + f"databricks-automl-runtime=={version.__version__}" +] + ARIMA_CONDA_ENV = _mlflow_conda_env( - additional_pip_deps=[ - f"pmdarima=={pmdarima.__version__}", - f"pandas=={pd.__version__}", - ] + additional_pip_deps=ARIMA_ADDITIONAL_PIP_DEPS ) diff --git a/runtime/databricks/automl_runtime/forecast/pmdarima/training.py b/runtime/databricks/automl_runtime/forecast/pmdarima/training.py index a3f6ba8a..23b22c9a 100644 --- a/runtime/databricks/automl_runtime/forecast/pmdarima/training.py +++ b/runtime/databricks/automl_runtime/forecast/pmdarima/training.py @@ -35,7 +35,8 @@ class ArimaEstimator: """ def __init__(self, horizon: int, frequency_unit: str, metric: str, seasonal_periods: List[int], - num_folds: int = 20, max_steps: int = 150, exogenous_cols: Optional[List[str]] = None) -> None: + num_folds: int = 20, max_steps: int = 150, exogenous_cols: Optional[List[str]] = None, + split_cutoff: Optional[pd.Timestamp] = None) -> None: """ :param horizon: Number of periods to forecast forward :param frequency_unit: Frequency of the time series @@ -45,6 +46,10 @@ def __init__(self, horizon: int, frequency_unit: str, metric: str, seasonal_peri :param max_steps: Max steps for stepwise auto_arima :param exogenous_cols: Optional list of column names of exogenous variables. If provided, these columns are used as additional features in arima model. + :param split_cutoff: Optional cutoff specified by user. If provided, + it is the starting point of cutoffs for cross validation. + For tuning job, it is the cutoff between train and validate split. + For training job, it is the cutoff bewteen validate and test split. """ self._horizon = horizon self._frequency_unit = OFFSET_ALIAS_MAP[frequency_unit] @@ -53,6 +58,7 @@ def __init__(self, horizon: int, frequency_unit: str, metric: str, seasonal_peri self._num_folds = num_folds self._max_steps = max_steps self._exogenous_cols = exogenous_cols + self._split_cutoff = split_cutoff def fit(self, df: pd.DataFrame) -> pd.DataFrame: """ @@ -88,12 +94,20 @@ def fit(self, df: pd.DataFrame) -> pd.DataFrame: # so the minimum valid seasonality period is always 1 validation_horizon = utils.get_validation_horizon(history_pd, self._horizon, self._frequency_unit) - cutoffs = utils.generate_cutoffs( - history_pd, - horizon=validation_horizon, - unit=self._frequency_unit, - num_folds=self._num_folds, - ) + if self._split_cutoff: + cutoffs = utils.generate_custom_cutoffs( + history_pd, + horizon=validation_horizon, + unit=self._frequency_unit, + split_cutoff=self._split_cutoff + ) + else: + cutoffs = utils.generate_cutoffs( + history_pd, + horizon=validation_horizon, + unit=self._frequency_unit, + num_folds=self._num_folds, + ) result = self._fit_predict(history_pd, cutoffs=cutoffs, seasonal_period=m, max_steps=self._max_steps) metric = result["metrics"]["smape"] diff --git a/runtime/databricks/automl_runtime/forecast/prophet/forecast.py b/runtime/databricks/automl_runtime/forecast/prophet/forecast.py index 643efe98..706700f2 100644 --- a/runtime/databricks/automl_runtime/forecast/prophet/forecast.py +++ b/runtime/databricks/automl_runtime/forecast/prophet/forecast.py @@ -91,7 +91,8 @@ def __init__(self, horizon: int, frequency_unit: str, metric: str, interval_widt algo=hyperopt.tpe.suggest, num_folds: int = 5, max_eval: int = 10, trial_timeout: int = None, random_state: int = 0, is_parallel: bool = True, - regressors = None, **prophet_kwargs) -> None: + regressors = None, + split_cutoff: Optional[pd.Timestamp] = None, **prophet_kwargs) -> None: """ Initialization @@ -108,6 +109,10 @@ def __init__(self, horizon: int, frequency_unit: str, metric: str, interval_widt :param random_state: random seed for hyperopt :param is_parallel: Indicators to decide that whether run hyperopt in :param regressors: list of column names of external regressors + :param split_cutoff: Optional cutoff specified by user. If provided, + it is the starting point of cutoffs for cross validation. + For tuning job, it is the cutoff between train and validate split. + For training job, it is the cutoff bewteen validate and test split. :param prophet_kwargs: Optional keyword arguments for Prophet model. For information about the parameters see: `The Prophet source code `_. @@ -125,6 +130,7 @@ def __init__(self, horizon: int, frequency_unit: str, metric: str, interval_widt self._timeout = trial_timeout self._is_parallel = is_parallel self._regressors = regressors + self._split_cutoff = split_cutoff self._prophet_kwargs = prophet_kwargs def fit(self, df: pd.DataFrame) -> pd.DataFrame: @@ -139,12 +145,20 @@ def fit(self, df: pd.DataFrame) -> pd.DataFrame: seasonality_mode = ["additive", "multiplicative"] validation_horizon = utils.get_validation_horizon(df, self._horizon, self._frequency_unit) - cutoffs = utils.generate_cutoffs( - df.reset_index(drop=True), - horizon=validation_horizon, - unit=self._frequency_unit, - num_folds=self._num_folds, - ) + if self._split_cutoff: + cutoffs = utils.generate_custom_cutoffs( + df.reset_index(drop=True), + horizon=validation_horizon, + unit=self._frequency_unit, + split_cutoff=self._split_cutoff + ) + else: + cutoffs = utils.generate_cutoffs( + df.reset_index(drop=True), + horizon=validation_horizon, + unit=self._frequency_unit, + num_folds=self._num_folds, + ) train_fn = partial(_prophet_fit_predict, history_pd=df, horizon=validation_horizon, frequency=self._frequency_unit, cutoffs=cutoffs, diff --git a/runtime/databricks/automl_runtime/forecast/prophet/model.py b/runtime/databricks/automl_runtime/forecast/prophet/model.py index ed155a3f..f678ce6b 100644 --- a/runtime/databricks/automl_runtime/forecast/prophet/model.py +++ b/runtime/databricks/automl_runtime/forecast/prophet/model.py @@ -29,12 +29,14 @@ from databricks.automl_runtime.forecast.utils import is_quaterly_alias, make_future_dataframe -PROPHET_CONDA_ENV = _mlflow_conda_env( - additional_pip_deps=[ +PROPHET_ADDITIONAL_PIP_DEPS = [ f"prophet=={prophet.__version__}", f"cloudpickle=={cloudpickle.__version__}", f"databricks-automl-runtime=={version.__version__}", ] + +PROPHET_CONDA_ENV = _mlflow_conda_env( + additional_pip_deps=PROPHET_ADDITIONAL_PIP_DEPS ) diff --git a/runtime/databricks/automl_runtime/forecast/utils.py b/runtime/databricks/automl_runtime/forecast/utils.py index 30def4f1..36016f26 100644 --- a/runtime/databricks/automl_runtime/forecast/utils.py +++ b/runtime/databricks/automl_runtime/forecast/utils.py @@ -187,6 +187,38 @@ def generate_cutoffs(df: pd.DataFrame, horizon: int, unit: str, ) return list(reversed(result)) +def generate_custom_cutoffs(df: pd.DataFrame, horizon: int, unit: str, + split_cutoff: pd.Timestamp) -> List[pd.Timestamp]: + """ + Generate custom cutoff times for cross validation based on user-specified split cutoff. + Period (step size) is 1. + :param df: pd.DataFrame of the historical data. + :param horizon: int number of time into the future for forecasting. + :param unit: frequency unit of the time series, which must be a pandas offset alias. + :param split_cutoff: the user-specified cutoff, as the starting point of cutoffs. + For tuning job, it is the cutoff between train and validate split. + For training job, it is the cutoff bewteen validate and test split. + :return: list of pd.Timestamp cutoffs for cross-validation. + """ + # TODO: [ML-43528] expose period as input. + period = 1 + period_dateoffset = pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[unit])*period + horizon_dateoffset = pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[unit])*horizon + + # First cutoff is the cutoff bewteen splits + cutoff = split_cutoff + result = [] + max_cutoff = max(df["ds"]) - horizon_dateoffset + while cutoff <= max_cutoff: + # If data does not exist in data range (cutoff, cutoff + horizon_dateoffset] + if (not (((df["ds"] > cutoff) & (df["ds"] <= cutoff + horizon_dateoffset)).any())): + # Next cutoff point is "next date after cutoff in data - horizon_dateoffset" + closest_date = df[df["ds"] > cutoff].min()["ds"] + cutoff = closest_date - horizon_dateoffset + result.append(cutoff) + cutoff += period_dateoffset + return result + def is_quaterly_alias(freq: str): return freq in QUATERLY_OFFSET_ALIAS diff --git a/runtime/databricks/automl_runtime/version.py b/runtime/databricks/automl_runtime/version.py index 4a18da24..d1e4e23a 100644 --- a/runtime/databricks/automl_runtime/version.py +++ b/runtime/databricks/automl_runtime/version.py @@ -14,4 +14,4 @@ # limitations under the License. # -__version__ = "0.2.20" # pragma: no cover +__version__ = "0.2.20.5" # pragma: no cover diff --git a/runtime/environment.txt b/runtime/environment.txt index a52cc682..e286b7fb 100644 --- a/runtime/environment.txt +++ b/runtime/environment.txt @@ -2,8 +2,10 @@ # * Keep dependencies sorted. category_encoders==2.6.0 +gluonts[torch]==0.15.1 holidays==0.28 hyperopt==0.2.7 +lightning==2.0.1 mlflow==2.0.1 numpy==1.23.5 pandas==1.5.3 @@ -11,4 +13,5 @@ pmdarima==2.0.3 prophet==1.1.4 pyarrow==8.0.0 scikit-learn==1.1.1 +torch==2.0.1 wrapt==1.14.1 diff --git a/runtime/requirements.txt b/runtime/requirements.txt index c601c6de..d612c67b 100644 --- a/runtime/requirements.txt +++ b/runtime/requirements.txt @@ -2,6 +2,7 @@ # * Keep dependencies sorted. category_encoders +gluonts holidays hyperopt mlflow @@ -13,4 +14,6 @@ prophet pyarrow requests scikit-learn +torch +lightning wrapt diff --git a/runtime/setup.py b/runtime/setup.py index d379e94e..17f162c7 100644 --- a/runtime/setup.py +++ b/runtime/setup.py @@ -46,6 +46,7 @@ "databricks", "databricks.automl_runtime", "databricks.automl_runtime.forecast", + "databricks.automl_runtime.forecast.deepar", "databricks.automl_runtime.forecast.pmdarima", "databricks.automl_runtime.forecast.prophet", "databricks.automl_runtime.hyperopt", diff --git a/runtime/tests/automl_runtime/forecast/deepar/__init__.py b/runtime/tests/automl_runtime/forecast/deepar/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/runtime/tests/automl_runtime/forecast/deepar/model_test.py b/runtime/tests/automl_runtime/forecast/deepar/model_test.py new file mode 100644 index 00000000..44894b93 --- /dev/null +++ b/runtime/tests/automl_runtime/forecast/deepar/model_test.py @@ -0,0 +1,308 @@ +# +# Copyright (C) 2024 Databricks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +import mlflow +import pandas as pd +import torch +import torch.nn as nn +from gluonts.dataset.field_names import FieldName +from gluonts.transform import InstanceSplitter, TestSplitSampler +from gluonts.torch.model.predictor import PyTorchPredictor + +from databricks.automl_runtime.forecast.deepar.model import ( + DeepARModel, + mlflow_deepar_log_model, + DEEPAR_ADDITIONAL_PIP_DEPS +) + + +class TestDeepARModel(unittest.TestCase): + @classmethod + def setUpClass(cls) -> None: + # Adapted from https://github.com/awslabs/gluonts/blob/dev/test/torch/model/test_torch_predictor.py + class RandomNetwork(nn.Module): + def __init__( + self, + prediction_length: int, + context_length: int, + ) -> None: + super().__init__() + self.prediction_length = prediction_length + self.context_length = context_length + self.net = nn.Linear(context_length, prediction_length) + torch.nn.init.uniform_(self.net.weight, -1.0, 1.0) + + def forward(self, past_target): + out = self.net(past_target.float()) + return out.unsqueeze(1) + + cls.context_length = 5 + cls.prediction_length = 5 + + cls.pred_net = RandomNetwork( + prediction_length=cls.context_length, context_length=cls.context_length + ) + + cls.transformation = InstanceSplitter( + target_field=FieldName.TARGET, + is_pad_field=FieldName.IS_PAD, + start_field=FieldName.START, + forecast_start_field=FieldName.FORECAST_START, + instance_sampler=TestSplitSampler(), + past_length=cls.context_length, + future_length=cls.prediction_length, + ) + + cls.model = PyTorchPredictor( + prediction_length=cls.prediction_length, + input_names=["past_target"], + prediction_net=cls.pred_net, + batch_size=16, + input_transform=cls.transformation, + device="cpu", + ) + + def _check_requirements(self, run_id: str): + # read requirements.txt from the run + requirements_path = mlflow.artifacts.download_artifacts(f"runs:/{run_id}/model/requirements.txt") + with open(requirements_path, "r") as f: + requirements = f.read() + # check if all additional dependencies are logged + for dependency in DEEPAR_ADDITIONAL_PIP_DEPS: + self.assertIn(dependency, requirements, f"requirements.txt should contain {dependency} but got {requirements}") + + def test_model_save_and_load_single_series(self): + target_col = "sales" + time_col = "date" + + deepar_model = DeepARModel( + model=self.model, + horizon=self.prediction_length, + frequency="d", + num_samples=1, + target_col=target_col, + time_col=time_col, + ) + + num_rows = 10 + sample_input = pd.concat( + [ + pd.to_datetime( + pd.Series(range(num_rows), name=time_col).apply( + lambda i: f"2020-10-{3 * i + 1}" + ) + ), + pd.Series(range(num_rows), name=target_col), + ], + axis=1, + ) + + with mlflow.start_run() as run: + mlflow_deepar_log_model(deepar_model, sample_input) + + run_id = run.info.run_id + + # check if all additional dependencies are logged + self._check_requirements(run_id) + + # load the model and predict + loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") + pred_df = loaded_model.predict(sample_input) + + self.assertEqual(pred_df.columns.tolist(), [time_col, "yhat"]) + self.assertEqual(len(pred_df), self.prediction_length) + self.assertGreater(pred_df[time_col].min(), sample_input[time_col].max()) + + def test_model_save_and_load_multi_series(self): + target_col = "sales" + time_col = "date" + id_col = "store" + + deepar_model = DeepARModel( + model=self.model, + horizon=self.prediction_length, + num_samples=1, + frequency="d", + target_col=target_col, + time_col=time_col, + id_cols=[id_col], + ) + + num_rows_per_ts = 10 + sample_input_base = pd.concat( + [ + pd.to_datetime( + pd.Series(range(num_rows_per_ts), name=time_col).apply( + lambda i: f"2020-10-{3 * i + 1}" + ) + ), + pd.Series(range(num_rows_per_ts), name=target_col), + ], + axis=1, + ) + sample_input = pd.concat([sample_input_base.copy(), sample_input_base.copy()], ignore_index=True) + sample_input[id_col] = [1] * num_rows_per_ts + [2] * num_rows_per_ts + + with mlflow.start_run() as run: + mlflow_deepar_log_model(deepar_model, sample_input) + + run_id = run.info.run_id + + # check if all additional dependencies are logged + self._check_requirements(run_id) + + # load the model and predict + loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") + + pred_df = loaded_model.predict(sample_input) + + self.assertEqual(pred_df.columns.tolist(), [time_col, "yhat", id_col]) + self.assertEqual(len(pred_df), self.prediction_length * 2) + self.assertGreater(pred_df[time_col].min(), sample_input[time_col].max()) + + def test_model_save_and_load_multi_series_multi_id_cols(self): + target_col = "sales" + time_col = "date" + id_cols = ["store", "dept"] + + deepar_model = DeepARModel( + model=self.model, + horizon=self.prediction_length, + num_samples=1, + frequency="d", + target_col=target_col, + time_col=time_col, + id_cols=id_cols, + ) + + num_rows_per_ts = 5 + sample_input_base = pd.concat( + [ + pd.to_datetime( + pd.Series(range(num_rows_per_ts), name=time_col).apply( + lambda i: f"2020-10-{3 * i + 1}" + ) + ), + pd.Series(range(num_rows_per_ts), name=target_col), + ], + axis=1, + ) + sample_input = pd.concat([sample_input_base.copy(), sample_input_base.copy(), + sample_input_base.copy(), sample_input_base.copy(),], ignore_index=True) + sample_input[id_cols[0]] = ['A'] * (2 * num_rows_per_ts) + ['B'] * (2 * num_rows_per_ts) + sample_input[id_cols[1]] = (['X'] * num_rows_per_ts + ['Y'] * num_rows_per_ts) * 2 + + with mlflow.start_run() as run: + mlflow_deepar_log_model(deepar_model, sample_input) + run_id = run.info.run_id + + # load the model and predict + loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") + pred_df = loaded_model.predict(sample_input) + + self.assertEqual(pred_df.columns.tolist(), [time_col, "yhat", "store-dept"]) + self.assertEqual(len(pred_df), self.prediction_length * 4) + self.assertGreater(pred_df[time_col].min(), sample_input[time_col].max()) + + def test_model_prediction_with_duplicate_timestamps(self): + """ + Test that the model correctly handles and averages multiple rows with the same timestamp + when identity columns are not provided. + """ + target_col = "sales" + time_col = "date" + + deepar_model = DeepARModel( + model=self.model, + horizon=self.prediction_length, + frequency="d", + num_samples=1, + target_col=target_col, + time_col=time_col, + ) + + # Create sample input with duplicate timestamps + dates = pd.to_datetime([ + "2020-10-01", "2020-10-01", # duplicate date with different values + "2020-10-04", "2020-10-04", "2020-10-04", # triple duplicate + "2020-10-07" # single entry + ]) + + sales = [10, 20, # should average to 15 + 30, 60, 90, # should average to 60 + 100] # single value stays 100 + + sample_input = pd.DataFrame({ + time_col: dates, + target_col: sales + }) + + with mlflow.start_run() as run: + mlflow_deepar_log_model(deepar_model, sample_input) + + run_id = run.info.run_id + + # Load the model and predict + loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") + pred_df = loaded_model.predict(sample_input) + + # Verify the prediction output format + self.assertEqual(pred_df.columns.tolist(), [time_col, "yhat"]) + self.assertEqual(len(pred_df), self.prediction_length) + self.assertGreater(pred_df[time_col].min(), sample_input[time_col].max()) + + def test_model_prediction_with_monthly_data(self): + target_col = "sales" + time_col = "date" + + deepar_model = DeepARModel( + model=self.model, + horizon=self.prediction_length, + frequency="MS", + num_samples=1, + target_col=target_col, + time_col=time_col, + ) + + # Create sample input with duplicate timestamps + dates = pd.to_datetime([ + "2020-10-01", "2020-11-01", "2020-12-01", + "2021-01-01", "2021-02-01", "2021-03-01" + ]) + + sales = [10, 20, 30, + 60, 90, 100] + + sample_input = pd.DataFrame({ + time_col: dates, + target_col: sales + }) + + with mlflow.start_run() as run: + mlflow_deepar_log_model(deepar_model, sample_input) + + run_id = run.info.run_id + + # Load the model and predict + loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") + pred_df = loaded_model.predict(sample_input) + + # Verify the prediction output format + self.assertEqual(pred_df.columns.tolist(), [time_col, "yhat"]) + self.assertEqual(len(pred_df), self.prediction_length) + self.assertGreater(pred_df[time_col].min(), sample_input[time_col].max()) \ No newline at end of file diff --git a/runtime/tests/automl_runtime/forecast/deepar/utils_test.py b/runtime/tests/automl_runtime/forecast/deepar/utils_test.py new file mode 100644 index 00000000..bd9a6551 --- /dev/null +++ b/runtime/tests/automl_runtime/forecast/deepar/utils_test.py @@ -0,0 +1,251 @@ +# +# Copyright (C) 2024 Databricks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest + +import pandas as pd + +from databricks.automl_runtime.forecast.deepar.utils import set_index_and_fill_missing_time_steps + + +class TestDeepARUtils(unittest.TestCase): + def test_single_series_filled(self): + target_col = "sales" + time_col = "date" + num_rows = 10 + + base_df = pd.concat( + [ + pd.to_datetime( + pd.Series(range(num_rows), name=time_col).apply( + lambda i: f"2020-10-{i + 1}" + ) + ), + pd.Series(range(num_rows), name=target_col), + ], + axis=1, + ) + dropped_df = base_df.drop([4, 5]).reset_index(drop=True) + + transformed_df = set_index_and_fill_missing_time_steps(dropped_df, time_col, "D") + + expected_df = base_df.copy() + expected_df.loc[[4, 5], target_col] = float('nan') + expected_df = expected_df.set_index(time_col).rename_axis(None).asfreq("D") + + pd.testing.assert_frame_equal(transformed_df, expected_df) + + def test_multi_series_filled(self): + target_col = "sales" + time_col = "date" + id_col = "store" + + num_rows_per_ts = 10 + base_df = pd.concat( + [ + pd.to_datetime( + pd.Series(range(num_rows_per_ts), name=time_col).apply( + lambda i: f"2020-10-{i + 1}" + ) + ), + pd.Series(range(num_rows_per_ts), name=target_col), + ], + axis=1, + ) + dropped_base_df = base_df.drop([4, 5]).reset_index(drop=True) + dropped_df = pd.concat([dropped_base_df.copy(), dropped_base_df.copy()], ignore_index=True) + dropped_df[id_col] = [1] * (num_rows_per_ts - 2) + [2] * (num_rows_per_ts - 2) + + transformed_df_dict = set_index_and_fill_missing_time_steps(dropped_df, time_col, "D", id_cols=[id_col]) + self.assertEqual(transformed_df_dict.keys(), {"1", "2"}) + + expected_first_df = base_df.copy() + expected_first_df.loc[[4, 5], target_col] = float('nan') + expected_first_df = expected_first_df.set_index(time_col).rename_axis(None).asfreq("D") + + pd.testing.assert_frame_equal(transformed_df_dict["1"], expected_first_df) + + def test_multi_series_multi_id_cols_filled(self): + target_col = "sales" + time_col = "date" + id_cols = ["store", "dept"] + + num_rows_per_ts = 10 + base_df = pd.concat( + [ + pd.to_datetime( + pd.Series(range(num_rows_per_ts), name=time_col).apply( + lambda i: f"2020-10-{i + 1}" + ) + ), + pd.Series(range(num_rows_per_ts), name=target_col), + ], + axis=1, + ) + dropped_base_df = base_df.drop([4, 5]).reset_index(drop=True) + dropped_df = pd.concat([dropped_base_df.copy(), dropped_base_df.copy(), + dropped_base_df.copy(), dropped_base_df.copy()], ignore_index=True) + dropped_df[id_cols[0]] = ([1] * (num_rows_per_ts - 2) + [2] * (num_rows_per_ts - 2)) * 2 + dropped_df[id_cols[1]] = [1] * (2 * (num_rows_per_ts - 2)) + [2] * (2 * (num_rows_per_ts - 2)) + + transformed_df_dict = set_index_and_fill_missing_time_steps(dropped_df, time_col, "D", id_cols=id_cols) + self.assertEqual(transformed_df_dict.keys(), {"1-1", "1-2", "2-1", "2-2"}) + + expected_first_df = base_df.copy() + expected_first_df.loc[[4, 5], target_col] = float('nan') + expected_first_df = expected_first_df.set_index(time_col).rename_axis(None).asfreq("D") + + pd.testing.assert_frame_equal(transformed_df_dict["1-1"], expected_first_df) + + def test_single_series_week_day_index(self): + target_col = "sales" + time_col = "date" + num_weeks = 10 + + # Starting from first Friday in 2020 + base_dates = pd.date_range( + start='2020-01-03', # First Friday of 2020 + periods=num_weeks, + freq='W-FRI' # Weekly frequency starting Friday + ) + + base_df = pd.DataFrame({ + time_col: base_dates, + target_col: range(num_weeks) + }) + + # Create a dataframe with missing weeks (drop weeks 3 and 4) + dropped_df = base_df.drop([3, 4]).reset_index(drop=True) + + # Transform the dataframe + transformed_df = set_index_and_fill_missing_time_steps( + dropped_df, + time_col, + "W" # Weekly frequency **without** specifying Friday + ) + + # Create expected dataframe + expected_df = base_df.copy() + expected_df.loc[[3, 4], target_col] = float('nan') + expected_df = expected_df.set_index(time_col).rename_axis(None).asfreq("W-FRI") + + # Assert equality + pd.testing.assert_frame_equal(transformed_df, expected_df) + + def test_single_series_month_start_index(self): + target_col = "sales" + time_col = "date" + num_months = 24 + + # Starting from first day of January 2020 + base_dates = pd.date_range( + start='2020-01-01', + periods=num_months, + freq='MS' + ) + + base_df = pd.DataFrame({ + time_col: base_dates, + target_col: range(num_months) + }) + + # Create a dataframe with missing months (drop months 3 and 4) + dropped_df = base_df.drop([3, 4]).reset_index(drop=True) + + # Transform the dataframe + transformed_df = set_index_and_fill_missing_time_steps( + dropped_df, + time_col, + "MS" # Monthly frequency + ) + + # Create expected dataframe + expected_df = base_df.copy() + expected_df.loc[[3, 4], target_col] = float('nan') + expected_df = expected_df.set_index(time_col).rename_axis(None) + + # Assert equality + pd.testing.assert_frame_equal(transformed_df, expected_df) + + def test_single_series_month_mid_index(self): + target_col = "sales" + time_col = "date" + num_months = 24 + + # Starting from fifteenth day of January 2020 + base_dates = pd.date_range( + start='2020-01-01', + periods=num_months, + freq='MS' + ) + pd.DateOffset(days=14) + + base_df = pd.DataFrame({ + time_col: base_dates, + target_col: range(num_months) + }) + + # Create a dataframe with missing months (drop months 3 and 4) + dropped_df = base_df.drop([3, 4]).reset_index(drop=True) + + # Transform the dataframe + transformed_df = set_index_and_fill_missing_time_steps( + dropped_df, + time_col, + "MS" + ) + + # Create expected dataframe + expected_df = base_df.copy() + expected_df.loc[[3, 4], target_col] = float('nan') + expected_df = expected_df.set_index(time_col).rename_axis(None) + + # Assert equality + pd.testing.assert_frame_equal(transformed_df, expected_df) + + def test_single_series_month_end_index(self): + target_col = "sales" + time_col = "date" + num_months = 24 + + # Starting from end day of January 2020 + base_dates = pd.date_range( + start='2020-01-01', + periods=num_months, + freq='M' + ) + + base_df = pd.DataFrame({ + time_col: base_dates, + target_col: range(num_months) + }) + + # Create a dataframe with missing months (drop months 3 and 4) + dropped_df = base_df.drop([3, 4]).reset_index(drop=True) + + # Transform the dataframe + transformed_df = set_index_and_fill_missing_time_steps( + dropped_df, + time_col, + "MS" # Monthly frequency + ) + + # Create expected dataframe + expected_df = base_df.copy() + expected_df.loc[[3, 4], target_col] = float('nan') + expected_df = expected_df.set_index(time_col).rename_axis(None) + + # Assert equality + pd.testing.assert_frame_equal(transformed_df, expected_df) + diff --git a/runtime/tests/automl_runtime/forecast/pmdarima/model_test.py b/runtime/tests/automl_runtime/forecast/pmdarima/model_test.py index f7616787..3c6d0d40 100644 --- a/runtime/tests/automl_runtime/forecast/pmdarima/model_test.py +++ b/runtime/tests/automl_runtime/forecast/pmdarima/model_test.py @@ -25,8 +25,13 @@ from mlflow.protos.databricks_pb2 import ErrorCode, INVALID_PARAMETER_VALUE from pmdarima.arima import ARIMA -from databricks.automl_runtime.forecast.pmdarima.model import ArimaModel, MultiSeriesArimaModel, AbstractArimaModel, \ - mlflow_arima_log_model +from databricks.automl_runtime.forecast.pmdarima.model import ( + ArimaModel, + MultiSeriesArimaModel, + AbstractArimaModel, + mlflow_arima_log_model, + ARIMA_ADDITIONAL_PIP_DEPS, +) class TestArimaModel(unittest.TestCase): @@ -438,6 +443,11 @@ def test_mlflow_arima_log_model(self): # Load the saved model from mlflow run_id = run.info.run_id + + # Check additonal requirements logged correctly + self._check_requirements(run_id) + + # Load the model loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") # Make sure can make forecasts with the saved model @@ -460,6 +470,11 @@ def test_mlflow_arima_log_model_multiseries(self): # Load the saved model from mlflow run_id = run.info.run_id + + # Check additonal requirements logged correctly + self._check_requirements(run_id) + + # Load the model loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") # Make sure can make forecasts with the saved model @@ -473,3 +488,12 @@ def test_mlflow_arima_log_model_multiseries(self): # Make sure can make forecasts for one-row dataframe loaded_model.predict(test_df[0:1]) + + def _check_requirements(self, run_id: str): + # read requirements.txt from the run + requirements_path = mlflow.artifacts.download_artifacts(f"runs:/{run_id}/model/requirements.txt") + with open(requirements_path, "r") as f: + requirements = f.read() + # check if all additional dependencies are logged + for dependency in ARIMA_ADDITIONAL_PIP_DEPS: + self.assertIn(dependency, requirements, f"requirements.txt should contain {dependency} but got {requirements}") diff --git a/runtime/tests/automl_runtime/forecast/pmdarima/training_test.py b/runtime/tests/automl_runtime/forecast/pmdarima/training_test.py index 36515719..1f1d07aa 100644 --- a/runtime/tests/automl_runtime/forecast/pmdarima/training_test.py +++ b/runtime/tests/automl_runtime/forecast/pmdarima/training_test.py @@ -72,6 +72,20 @@ def test_fit_success_with_exogenous(self): results_pd = arima_estimator.fit(self.df_with_exogenous) self.assertIn("smape", results_pd) self.assertIn("pickled_model", results_pd) + + def test_fit_success_with_split_cutoff(self): + for freq, df, split_cutoff in [['d', self.df, '2020-07-17 00:00:00'], + ['d', self.df_string_time, '2020-07-17 00:00:00'], + ['month', self.df_monthly, '2020-09-07 00:00:00']]: + arima_estimator = ArimaEstimator(horizon=1, + frequency_unit=freq, + metric="smape", + seasonal_periods=[1, 7], + num_folds=2, + split_cutoff=pd.Timestamp(split_cutoff)) + results_pd = arima_estimator.fit(df) + self.assertIn("smape", results_pd) + self.assertIn("pickled_model", results_pd) def test_fit_skip_too_long_seasonality(self): arima_estimator = ArimaEstimator(horizon=1, diff --git a/runtime/tests/automl_runtime/forecast/prophet/forecast_test.py b/runtime/tests/automl_runtime/forecast/prophet/forecast_test.py index 9cb6378b..d3b9478d 100644 --- a/runtime/tests/automl_runtime/forecast/prophet/forecast_test.py +++ b/runtime/tests/automl_runtime/forecast/prophet/forecast_test.py @@ -140,6 +140,38 @@ def test_training_with_extra_regressors(self): model_json = json.loads(results["model_json"][0]) self.assertListEqual(model_json["extra_regressors"][0], ["f1", "f2"]) + def test_training_with_split_cutoff(self): + test_spaces = [['D', self.df, '2020-07-10 00:00:00', 1e-6], + ['D', self.df_datetime_date, '2020-07-10 00:00:00', 1e-6], + ['D', self.df_string_time, '2020-07-10 00:00:00', 1e-6], + ['M', self.df_string_monthly_time, '2020-10-15 00:00:00', 1e-1], + ['Q', self.df_string_quarterly_time, '2022-04-15 00:00:00', 1e-1], + ['Y', self.df_string_annually_time, '2021-01-15 00:00:00', 5e-1]] + for freq, df, split_cutoff, delta in test_spaces: + hyperopt_estim = ProphetHyperoptEstimator(horizon=1, + frequency_unit=freq, + metric="smape", + interval_width=0.8, + country_holidays="US", + search_space=self.search_space, + num_folds=2, + trial_timeout=1000, + random_state=0, + is_parallel=False, + split_cutoff=pd.Timestamp(split_cutoff)) + results = hyperopt_estim.fit(df) + self.assertAlmostEqual(results["mse"][0], 0, delta=delta) + self.assertAlmostEqual(results["rmse"][0], 0, delta=delta) + self.assertAlmostEqual(results["mae"][0], 0, delta=delta) + self.assertAlmostEqual(results["mape"][0], 0, delta=delta) + self.assertAlmostEqual(results["mdape"][0], 0, delta=delta) + self.assertAlmostEqual(results["smape"][0], 0, delta=delta) + self.assertAlmostEqual(results["coverage"][0], 1, delta=delta) + # check the best result parameter is inside the search space + model_json = json.loads(results["model_json"][0]) + self.assertGreaterEqual(model_json["changepoint_prior_scale"], 0.1) + self.assertLessEqual(model_json["changepoint_prior_scale"], 0.5) + @patch("databricks.automl_runtime.forecast.prophet.forecast.fmin") @patch("databricks.automl_runtime.forecast.prophet.forecast.Trials") @patch("databricks.automl_runtime.forecast.prophet.forecast.partial") diff --git a/runtime/tests/automl_runtime/forecast/prophet/model_test.py b/runtime/tests/automl_runtime/forecast/prophet/model_test.py index a4b321a8..4ce65dae 100644 --- a/runtime/tests/automl_runtime/forecast/prophet/model_test.py +++ b/runtime/tests/automl_runtime/forecast/prophet/model_test.py @@ -32,12 +32,22 @@ ProphetModel, OFFSET_ALIAS_MAP, DATE_OFFSET_KEYWORD_MAP, + PROPHET_ADDITIONAL_PIP_DEPS ) PROPHET_MODEL_JSON = '{"growth": "linear", "n_changepoints": 6, "specified_changepoints": false, "changepoint_range": 0.8, "yearly_seasonality": "auto", "weekly_seasonality": "auto", "daily_seasonality": "auto", "seasonality_mode": "additive", "seasonality_prior_scale": 10.0, "changepoint_prior_scale": 0.05, "holidays_prior_scale": 10.0, "mcmc_samples": 0, "interval_width": 0.8, "uncertainty_samples": 1000, "y_scale": 8.0, "logistic_floor": false, "country_holidays": null, "component_modes": {"additive": ["weekly", "additive_terms", "extra_regressors_additive", "holidays"], "multiplicative": ["multiplicative_terms", "extra_regressors_multiplicative"]}, "changepoints": "{\\"name\\":\\"ds\\",\\"index\\":[1,2,3,4,5,6],\\"data\\":[\\"2020-10-04T00:00:00.000\\",\\"2020-10-07T00:00:00.000\\",\\"2020-10-10T00:00:00.000\\",\\"2020-10-13T00:00:00.000\\",\\"2020-10-16T00:00:00.000\\",\\"2020-10-19T00:00:00.000\\"]}", "history_dates": "{\\"name\\":\\"ds\\",\\"index\\":[0,1,2,3,4,5,6,7,8],\\"data\\":[\\"2020-10-01T00:00:00.000\\",\\"2020-10-04T00:00:00.000\\",\\"2020-10-07T00:00:00.000\\",\\"2020-10-10T00:00:00.000\\",\\"2020-10-13T00:00:00.000\\",\\"2020-10-16T00:00:00.000\\",\\"2020-10-19T00:00:00.000\\",\\"2020-10-22T00:00:00.000\\",\\"2020-10-25T00:00:00.000\\"]}", "train_holiday_names": null, "start": 1601510400.0, "t_scale": 2073600.0, "holidays": null, "history": "{\\"schema\\":{\\"fields\\":[{\\"name\\":\\"ds\\",\\"type\\":\\"datetime\\"},{\\"name\\":\\"y\\",\\"type\\":\\"integer\\"},{\\"name\\":\\"floor\\",\\"type\\":\\"integer\\"},{\\"name\\":\\"t\\",\\"type\\":\\"number\\"},{\\"name\\":\\"y_scaled\\",\\"type\\":\\"number\\"}],\\"pandas_version\\":\\"1.4.0\\"},\\"data\\":[{\\"ds\\":\\"2020-10-01T00:00:00.000\\",\\"y\\":0,\\"floor\\":0,\\"t\\":0.0,\\"y_scaled\\":0.0},{\\"ds\\":\\"2020-10-04T00:00:00.000\\",\\"y\\":1,\\"floor\\":0,\\"t\\":0.125,\\"y_scaled\\":0.125},{\\"ds\\":\\"2020-10-07T00:00:00.000\\",\\"y\\":2,\\"floor\\":0,\\"t\\":0.25,\\"y_scaled\\":0.25},{\\"ds\\":\\"2020-10-10T00:00:00.000\\",\\"y\\":3,\\"floor\\":0,\\"t\\":0.375,\\"y_scaled\\":0.375},{\\"ds\\":\\"2020-10-13T00:00:00.000\\",\\"y\\":4,\\"floor\\":0,\\"t\\":0.5,\\"y_scaled\\":0.5},{\\"ds\\":\\"2020-10-16T00:00:00.000\\",\\"y\\":5,\\"floor\\":0,\\"t\\":0.625,\\"y_scaled\\":0.625},{\\"ds\\":\\"2020-10-19T00:00:00.000\\",\\"y\\":6,\\"floor\\":0,\\"t\\":0.75,\\"y_scaled\\":0.75},{\\"ds\\":\\"2020-10-22T00:00:00.000\\",\\"y\\":7,\\"floor\\":0,\\"t\\":0.875,\\"y_scaled\\":0.875},{\\"ds\\":\\"2020-10-25T00:00:00.000\\",\\"y\\":8,\\"floor\\":0,\\"t\\":1.0,\\"y_scaled\\":1.0}]}", "train_component_cols": "{\\"schema\\":{\\"fields\\":[{\\"name\\":\\"additive_terms\\",\\"type\\":\\"integer\\"},{\\"name\\":\\"weekly\\",\\"type\\":\\"integer\\"},{\\"name\\":\\"multiplicative_terms\\",\\"type\\":\\"integer\\"}],\\"pandas_version\\":\\"1.4.0\\"},\\"data\\":[{\\"additive_terms\\":1,\\"weekly\\":1,\\"multiplicative_terms\\":0},{\\"additive_terms\\":1,\\"weekly\\":1,\\"multiplicative_terms\\":0},{\\"additive_terms\\":1,\\"weekly\\":1,\\"multiplicative_terms\\":0},{\\"additive_terms\\":1,\\"weekly\\":1,\\"multiplicative_terms\\":0},{\\"additive_terms\\":1,\\"weekly\\":1,\\"multiplicative_terms\\":0},{\\"additive_terms\\":1,\\"weekly\\":1,\\"multiplicative_terms\\":0}]}", "changepoints_t": [0.125, 0.25, 0.375, 0.5, 0.625, 0.75], "seasonalities": [["weekly"], {"weekly": {"period": 7, "fourier_order": 3, "prior_scale": 10.0, "mode": "additive", "condition_name": null}}], "extra_regressors": [[], {}], "fit_kwargs": {}, "params": {"lp__": [[202.053]], "k": [[1.19777]], "m": [[0.0565623]], "delta": [[-0.86152, 0.409957, -0.103241, 0.528979, 0.535181, -0.509356]], "sigma_obs": [[2.53056e-13]], "beta": [[-0.00630566, 0.016248, 0.0318587, -0.068705, 0.0029986, -0.00410522]], "trend": [[0.0565623, 0.206283, 0.248314, 0.341589, 0.421959, 0.568452, 0.781842, 0.931562, 1.08128]]}, "__prophet_version": "1.1.1"}' - -class TestProphetModel(unittest.TestCase): +class BaseProphetModelTest(unittest.TestCase): + def _check_requirements(self, run_id: str): + # read requirements.txt from the run + requirements_path = mlflow.artifacts.download_artifacts(f"runs:/{run_id}/model/requirements.txt") + with open(requirements_path, "r") as f: + requirements = f.read() + # check if all additional dependencies are logged + for dependency in PROPHET_ADDITIONAL_PIP_DEPS: + self.assertIn(dependency, requirements, f"requirements.txt should contain {dependency} but got {requirements}") + +class TestProphetModel(BaseProphetModelTest): @classmethod def setUpClass(cls) -> None: num_rows = 9 @@ -74,8 +84,13 @@ def test_model_save_and_load(self): with mlflow.start_run() as run: mlflow_prophet_log_model(prophet_model) - # Load the saved model from mlflow + run_id = run.info.run_id + + # Check additonal requirements logged correctly + self._check_requirements(run_id) + + # Load the saved model from mlflow prophet_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") # Check the prediction with the saved model @@ -144,7 +159,7 @@ def test_validate_predict_cols(self): assert e.value.error_code == ErrorCode.Name(INTERNAL_ERROR) -class TestMultiSeriesProphetModel(unittest.TestCase): +class TestMultiSeriesProphetModel(BaseProphetModelTest): @classmethod def setUpClass(cls) -> None: cls.model_json = PROPHET_MODEL_JSON @@ -178,8 +193,13 @@ def test_model_save_and_load(self): with mlflow.start_run() as run: mlflow_prophet_log_model(self.prophet_model, sample_input=test_df) - # Load the saved model from mlflow + run_id = run.info.run_id + + # Check additonal requirements logged correctly + self._check_requirements(run_id) + + # Load the saved model from mlflow loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") # Check the prediction with the saved model @@ -239,9 +259,13 @@ def test_model_save_and_load_multi_ids(self): ) with mlflow.start_run() as run: mlflow_prophet_log_model(prophet_model, sample_input=test_df) + + run_id = run.info.run_id + + # Check additonal requirements logged correctly + self._check_requirements(run_id) # Load the saved model from mlflow - run_id = run.info.run_id loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") # Check the prediction with the saved model diff --git a/runtime/tests/automl_runtime/forecast/utils_test.py b/runtime/tests/automl_runtime/forecast/utils_test.py index e490f8ab..84bd94e0 100644 --- a/runtime/tests/automl_runtime/forecast/utils_test.py +++ b/runtime/tests/automl_runtime/forecast/utils_test.py @@ -22,7 +22,8 @@ from databricks.automl_runtime.forecast import DATE_OFFSET_KEYWORD_MAP from databricks.automl_runtime.forecast.utils import \ generate_cutoffs, get_validation_horizon, calculate_period_differences, \ - is_frequency_consistency, make_future_dataframe, make_single_future_dataframe + is_frequency_consistency, make_future_dataframe, make_single_future_dataframe, \ + generate_custom_cutoffs class TestGetValidationHorizon(unittest.TestCase): @@ -177,6 +178,80 @@ def test_generate_cutoffs_success_annualy(self): self.assertEqual([pd.Timestamp('2018-07-14 00:00:00'), pd.Timestamp('2019-07-14 00:00:00'), pd.Timestamp('2020-07-14 00:00:00')], cutoffs) +class TestTestGenerateCustomCutoffs(unittest.TestCase): + + def test_generate_custom_cutoffs_success_hourly(self): + df = pd.DataFrame( + pd.date_range(start="2020-07-01", periods=168, freq='h'), columns=["ds"] + ).rename_axis("y").reset_index() + expected_cutoffs = [pd.Timestamp('2020-07-07 13:00:00'), + pd.Timestamp('2020-07-07 14:00:00'), + pd.Timestamp('2020-07-07 15:00:00'), + pd.Timestamp('2020-07-07 16:00:00')] + cutoffs = generate_custom_cutoffs(df, horizon=7, unit="H", split_cutoff=pd.Timestamp('2020-07-07 13:00:00')) + self.assertEqual(expected_cutoffs, cutoffs) + + def test_generate_custom_cutoffs_success_daily(self): + df = pd.DataFrame( + pd.date_range(start="2020-07-01", end="2020-08-30", freq='d'), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_custom_cutoffs(df, horizon=7, unit="D", split_cutoff=pd.Timestamp('2020-08-21 00:00:00')) + self.assertEqual([pd.Timestamp('2020-08-21 00:00:00'), pd.Timestamp('2020-08-22 00:00:00'), pd.Timestamp('2020-08-23 00:00:00')], cutoffs) + + def test_generate_custom_cutoffs_success_small_horizon(self): + df = pd.DataFrame( + pd.date_range(start="2020-07-01", end="2020-08-30", freq='2d'), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_custom_cutoffs(df, horizon=1, unit="D", split_cutoff=pd.Timestamp('2020-08-26 00:00:00')) + self.assertEqual([pd.Timestamp('2020-08-27 00:00:00'), pd.Timestamp('2020-08-29 00:00:00')], cutoffs) + + def test_generate_custom_cutoffs_success_weekly(self): + df = pd.DataFrame( + pd.date_range(start="2020-07-01", periods=52, freq='W'), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_custom_cutoffs(df, horizon=7, unit="W", split_cutoff=pd.Timestamp('2021-04-25 00:00:00')) + self.assertEqual([pd.Timestamp('2021-04-25 00:00:00'), pd.Timestamp('2021-05-02 00:00:00'), pd.Timestamp('2021-05-09 00:00:00')], cutoffs) + + def test_generate_custom_cutoffs_success_monthly(self): + df = pd.DataFrame( + pd.date_range(start="2020-01-12", periods=24, freq=pd.DateOffset(months=1)), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_custom_cutoffs(df, horizon=7, unit="MS", split_cutoff=pd.Timestamp('2021-03-12 00:00:00')) + self.assertEqual([pd.Timestamp('2021-03-12 00:00:00'), pd.Timestamp('2021-04-12 00:00:00'), pd.Timestamp('2021-05-12 00:00:00')], cutoffs) + + def test_generate_custom_cutoffs_success_quaterly(self): + df = pd.DataFrame( + pd.date_range(start="2020-07-12", periods=9, freq=pd.DateOffset(months=3)), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_custom_cutoffs(df, horizon=7, unit="QS", split_cutoff=pd.Timestamp('2020-07-12 00:00:00')) + self.assertEqual([pd.Timestamp('2020-07-12 00:00:00'), pd.Timestamp('2020-10-12 00:00:00')], cutoffs) + + def test_generate_custom_cutoffs_success_annualy(self): + df = pd.DataFrame( + pd.date_range(start="2012-07-14", periods=10, freq=pd.DateOffset(years=1)), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_custom_cutoffs(df, horizon=7, unit="YS", split_cutoff=pd.Timestamp('2012-07-14 00:00:00')) + self.assertEqual([pd.Timestamp('2012-07-14 00:00:00'), pd.Timestamp('2013-07-14 00:00:00'), pd.Timestamp('2014-07-14 00:00:00')], cutoffs) + + def test_generate_custom_cutoffs_success_with_small_gaps(self): + df = pd.DataFrame( + pd.date_range(start="2020-07-01", periods=30, freq='3d'), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_custom_cutoffs(df, horizon=7, unit="D", split_cutoff=pd.Timestamp('2020-09-17 00:00:00')) + self.assertEqual([pd.Timestamp('2020-09-17 00:00:00'), + pd.Timestamp('2020-09-18 00:00:00'), + pd.Timestamp('2020-09-19 00:00:00')], cutoffs) + + def test_generate_custom_cutoffs_success_with_large_gaps(self): + df = pd.DataFrame( + pd.date_range(start="2020-07-01", periods=30, freq='9d'), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_custom_cutoffs(df, horizon=7, unit="D", split_cutoff=pd.Timestamp('2021-03-08 00:00:00')) + self.assertEqual([pd.Timestamp('2021-03-08 00:00:00'), + pd.Timestamp('2021-03-09 00:00:00'), + pd.Timestamp('2021-03-12 00:00:00')], cutoffs) + + class TestCalculatePeriodsAndFrequency(unittest.TestCase): def setUp(self) -> None: return super().setUp()