From 90a5cc326c14733f85dfd2b013366f29fa342ba1 Mon Sep 17 00:00:00 2001 From: Lan Zhang <159198357+Lanz-db@users.noreply.github.com> Date: Thu, 25 Jul 2024 10:01:32 -0700 Subject: [PATCH 01/11] [ML-42739] Add custom forecasting data splits for automl_runtime (#145) * init * add tests * fix tests * add comments * fix comments * flake * revert Optional * add comment * fix tests * fix tests * fix tests * fix tests * fix tests * add test print * fix * fix * update test * delete print * update comments * increase version number * fix --- .../forecast/pmdarima/training.py | 28 +++++-- .../forecast/prophet/forecast.py | 28 +++++-- .../automl_runtime/forecast/utils.py | 32 ++++++++ runtime/databricks/automl_runtime/version.py | 2 +- .../forecast/pmdarima/training_test.py | 14 ++++ .../forecast/prophet/forecast_test.py | 32 ++++++++ .../automl_runtime/forecast/utils_test.py | 77 ++++++++++++++++++- 7 files changed, 197 insertions(+), 16 deletions(-) diff --git a/runtime/databricks/automl_runtime/forecast/pmdarima/training.py b/runtime/databricks/automl_runtime/forecast/pmdarima/training.py index a3f6ba8a..23b22c9a 100644 --- a/runtime/databricks/automl_runtime/forecast/pmdarima/training.py +++ b/runtime/databricks/automl_runtime/forecast/pmdarima/training.py @@ -35,7 +35,8 @@ class ArimaEstimator: """ def __init__(self, horizon: int, frequency_unit: str, metric: str, seasonal_periods: List[int], - num_folds: int = 20, max_steps: int = 150, exogenous_cols: Optional[List[str]] = None) -> None: + num_folds: int = 20, max_steps: int = 150, exogenous_cols: Optional[List[str]] = None, + split_cutoff: Optional[pd.Timestamp] = None) -> None: """ :param horizon: Number of periods to forecast forward :param frequency_unit: Frequency of the time series @@ -45,6 +46,10 @@ def __init__(self, horizon: int, frequency_unit: str, metric: str, seasonal_peri :param max_steps: Max steps for stepwise auto_arima :param exogenous_cols: Optional list of column names of exogenous variables. If provided, these columns are used as additional features in arima model. + :param split_cutoff: Optional cutoff specified by user. If provided, + it is the starting point of cutoffs for cross validation. + For tuning job, it is the cutoff between train and validate split. + For training job, it is the cutoff bewteen validate and test split. """ self._horizon = horizon self._frequency_unit = OFFSET_ALIAS_MAP[frequency_unit] @@ -53,6 +58,7 @@ def __init__(self, horizon: int, frequency_unit: str, metric: str, seasonal_peri self._num_folds = num_folds self._max_steps = max_steps self._exogenous_cols = exogenous_cols + self._split_cutoff = split_cutoff def fit(self, df: pd.DataFrame) -> pd.DataFrame: """ @@ -88,12 +94,20 @@ def fit(self, df: pd.DataFrame) -> pd.DataFrame: # so the minimum valid seasonality period is always 1 validation_horizon = utils.get_validation_horizon(history_pd, self._horizon, self._frequency_unit) - cutoffs = utils.generate_cutoffs( - history_pd, - horizon=validation_horizon, - unit=self._frequency_unit, - num_folds=self._num_folds, - ) + if self._split_cutoff: + cutoffs = utils.generate_custom_cutoffs( + history_pd, + horizon=validation_horizon, + unit=self._frequency_unit, + split_cutoff=self._split_cutoff + ) + else: + cutoffs = utils.generate_cutoffs( + history_pd, + horizon=validation_horizon, + unit=self._frequency_unit, + num_folds=self._num_folds, + ) result = self._fit_predict(history_pd, cutoffs=cutoffs, seasonal_period=m, max_steps=self._max_steps) metric = result["metrics"]["smape"] diff --git a/runtime/databricks/automl_runtime/forecast/prophet/forecast.py b/runtime/databricks/automl_runtime/forecast/prophet/forecast.py index 643efe98..706700f2 100644 --- a/runtime/databricks/automl_runtime/forecast/prophet/forecast.py +++ b/runtime/databricks/automl_runtime/forecast/prophet/forecast.py @@ -91,7 +91,8 @@ def __init__(self, horizon: int, frequency_unit: str, metric: str, interval_widt algo=hyperopt.tpe.suggest, num_folds: int = 5, max_eval: int = 10, trial_timeout: int = None, random_state: int = 0, is_parallel: bool = True, - regressors = None, **prophet_kwargs) -> None: + regressors = None, + split_cutoff: Optional[pd.Timestamp] = None, **prophet_kwargs) -> None: """ Initialization @@ -108,6 +109,10 @@ def __init__(self, horizon: int, frequency_unit: str, metric: str, interval_widt :param random_state: random seed for hyperopt :param is_parallel: Indicators to decide that whether run hyperopt in :param regressors: list of column names of external regressors + :param split_cutoff: Optional cutoff specified by user. If provided, + it is the starting point of cutoffs for cross validation. + For tuning job, it is the cutoff between train and validate split. + For training job, it is the cutoff bewteen validate and test split. :param prophet_kwargs: Optional keyword arguments for Prophet model. For information about the parameters see: `The Prophet source code `_. @@ -125,6 +130,7 @@ def __init__(self, horizon: int, frequency_unit: str, metric: str, interval_widt self._timeout = trial_timeout self._is_parallel = is_parallel self._regressors = regressors + self._split_cutoff = split_cutoff self._prophet_kwargs = prophet_kwargs def fit(self, df: pd.DataFrame) -> pd.DataFrame: @@ -139,12 +145,20 @@ def fit(self, df: pd.DataFrame) -> pd.DataFrame: seasonality_mode = ["additive", "multiplicative"] validation_horizon = utils.get_validation_horizon(df, self._horizon, self._frequency_unit) - cutoffs = utils.generate_cutoffs( - df.reset_index(drop=True), - horizon=validation_horizon, - unit=self._frequency_unit, - num_folds=self._num_folds, - ) + if self._split_cutoff: + cutoffs = utils.generate_custom_cutoffs( + df.reset_index(drop=True), + horizon=validation_horizon, + unit=self._frequency_unit, + split_cutoff=self._split_cutoff + ) + else: + cutoffs = utils.generate_cutoffs( + df.reset_index(drop=True), + horizon=validation_horizon, + unit=self._frequency_unit, + num_folds=self._num_folds, + ) train_fn = partial(_prophet_fit_predict, history_pd=df, horizon=validation_horizon, frequency=self._frequency_unit, cutoffs=cutoffs, diff --git a/runtime/databricks/automl_runtime/forecast/utils.py b/runtime/databricks/automl_runtime/forecast/utils.py index 30def4f1..36016f26 100644 --- a/runtime/databricks/automl_runtime/forecast/utils.py +++ b/runtime/databricks/automl_runtime/forecast/utils.py @@ -187,6 +187,38 @@ def generate_cutoffs(df: pd.DataFrame, horizon: int, unit: str, ) return list(reversed(result)) +def generate_custom_cutoffs(df: pd.DataFrame, horizon: int, unit: str, + split_cutoff: pd.Timestamp) -> List[pd.Timestamp]: + """ + Generate custom cutoff times for cross validation based on user-specified split cutoff. + Period (step size) is 1. + :param df: pd.DataFrame of the historical data. + :param horizon: int number of time into the future for forecasting. + :param unit: frequency unit of the time series, which must be a pandas offset alias. + :param split_cutoff: the user-specified cutoff, as the starting point of cutoffs. + For tuning job, it is the cutoff between train and validate split. + For training job, it is the cutoff bewteen validate and test split. + :return: list of pd.Timestamp cutoffs for cross-validation. + """ + # TODO: [ML-43528] expose period as input. + period = 1 + period_dateoffset = pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[unit])*period + horizon_dateoffset = pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[unit])*horizon + + # First cutoff is the cutoff bewteen splits + cutoff = split_cutoff + result = [] + max_cutoff = max(df["ds"]) - horizon_dateoffset + while cutoff <= max_cutoff: + # If data does not exist in data range (cutoff, cutoff + horizon_dateoffset] + if (not (((df["ds"] > cutoff) & (df["ds"] <= cutoff + horizon_dateoffset)).any())): + # Next cutoff point is "next date after cutoff in data - horizon_dateoffset" + closest_date = df[df["ds"] > cutoff].min()["ds"] + cutoff = closest_date - horizon_dateoffset + result.append(cutoff) + cutoff += period_dateoffset + return result + def is_quaterly_alias(freq: str): return freq in QUATERLY_OFFSET_ALIAS diff --git a/runtime/databricks/automl_runtime/version.py b/runtime/databricks/automl_runtime/version.py index 4a18da24..574eeb2e 100644 --- a/runtime/databricks/automl_runtime/version.py +++ b/runtime/databricks/automl_runtime/version.py @@ -14,4 +14,4 @@ # limitations under the License. # -__version__ = "0.2.20" # pragma: no cover +__version__ = "0.2.20.1" # pragma: no cover diff --git a/runtime/tests/automl_runtime/forecast/pmdarima/training_test.py b/runtime/tests/automl_runtime/forecast/pmdarima/training_test.py index 36515719..1f1d07aa 100644 --- a/runtime/tests/automl_runtime/forecast/pmdarima/training_test.py +++ b/runtime/tests/automl_runtime/forecast/pmdarima/training_test.py @@ -72,6 +72,20 @@ def test_fit_success_with_exogenous(self): results_pd = arima_estimator.fit(self.df_with_exogenous) self.assertIn("smape", results_pd) self.assertIn("pickled_model", results_pd) + + def test_fit_success_with_split_cutoff(self): + for freq, df, split_cutoff in [['d', self.df, '2020-07-17 00:00:00'], + ['d', self.df_string_time, '2020-07-17 00:00:00'], + ['month', self.df_monthly, '2020-09-07 00:00:00']]: + arima_estimator = ArimaEstimator(horizon=1, + frequency_unit=freq, + metric="smape", + seasonal_periods=[1, 7], + num_folds=2, + split_cutoff=pd.Timestamp(split_cutoff)) + results_pd = arima_estimator.fit(df) + self.assertIn("smape", results_pd) + self.assertIn("pickled_model", results_pd) def test_fit_skip_too_long_seasonality(self): arima_estimator = ArimaEstimator(horizon=1, diff --git a/runtime/tests/automl_runtime/forecast/prophet/forecast_test.py b/runtime/tests/automl_runtime/forecast/prophet/forecast_test.py index 9cb6378b..d3b9478d 100644 --- a/runtime/tests/automl_runtime/forecast/prophet/forecast_test.py +++ b/runtime/tests/automl_runtime/forecast/prophet/forecast_test.py @@ -140,6 +140,38 @@ def test_training_with_extra_regressors(self): model_json = json.loads(results["model_json"][0]) self.assertListEqual(model_json["extra_regressors"][0], ["f1", "f2"]) + def test_training_with_split_cutoff(self): + test_spaces = [['D', self.df, '2020-07-10 00:00:00', 1e-6], + ['D', self.df_datetime_date, '2020-07-10 00:00:00', 1e-6], + ['D', self.df_string_time, '2020-07-10 00:00:00', 1e-6], + ['M', self.df_string_monthly_time, '2020-10-15 00:00:00', 1e-1], + ['Q', self.df_string_quarterly_time, '2022-04-15 00:00:00', 1e-1], + ['Y', self.df_string_annually_time, '2021-01-15 00:00:00', 5e-1]] + for freq, df, split_cutoff, delta in test_spaces: + hyperopt_estim = ProphetHyperoptEstimator(horizon=1, + frequency_unit=freq, + metric="smape", + interval_width=0.8, + country_holidays="US", + search_space=self.search_space, + num_folds=2, + trial_timeout=1000, + random_state=0, + is_parallel=False, + split_cutoff=pd.Timestamp(split_cutoff)) + results = hyperopt_estim.fit(df) + self.assertAlmostEqual(results["mse"][0], 0, delta=delta) + self.assertAlmostEqual(results["rmse"][0], 0, delta=delta) + self.assertAlmostEqual(results["mae"][0], 0, delta=delta) + self.assertAlmostEqual(results["mape"][0], 0, delta=delta) + self.assertAlmostEqual(results["mdape"][0], 0, delta=delta) + self.assertAlmostEqual(results["smape"][0], 0, delta=delta) + self.assertAlmostEqual(results["coverage"][0], 1, delta=delta) + # check the best result parameter is inside the search space + model_json = json.loads(results["model_json"][0]) + self.assertGreaterEqual(model_json["changepoint_prior_scale"], 0.1) + self.assertLessEqual(model_json["changepoint_prior_scale"], 0.5) + @patch("databricks.automl_runtime.forecast.prophet.forecast.fmin") @patch("databricks.automl_runtime.forecast.prophet.forecast.Trials") @patch("databricks.automl_runtime.forecast.prophet.forecast.partial") diff --git a/runtime/tests/automl_runtime/forecast/utils_test.py b/runtime/tests/automl_runtime/forecast/utils_test.py index e490f8ab..84bd94e0 100644 --- a/runtime/tests/automl_runtime/forecast/utils_test.py +++ b/runtime/tests/automl_runtime/forecast/utils_test.py @@ -22,7 +22,8 @@ from databricks.automl_runtime.forecast import DATE_OFFSET_KEYWORD_MAP from databricks.automl_runtime.forecast.utils import \ generate_cutoffs, get_validation_horizon, calculate_period_differences, \ - is_frequency_consistency, make_future_dataframe, make_single_future_dataframe + is_frequency_consistency, make_future_dataframe, make_single_future_dataframe, \ + generate_custom_cutoffs class TestGetValidationHorizon(unittest.TestCase): @@ -177,6 +178,80 @@ def test_generate_cutoffs_success_annualy(self): self.assertEqual([pd.Timestamp('2018-07-14 00:00:00'), pd.Timestamp('2019-07-14 00:00:00'), pd.Timestamp('2020-07-14 00:00:00')], cutoffs) +class TestTestGenerateCustomCutoffs(unittest.TestCase): + + def test_generate_custom_cutoffs_success_hourly(self): + df = pd.DataFrame( + pd.date_range(start="2020-07-01", periods=168, freq='h'), columns=["ds"] + ).rename_axis("y").reset_index() + expected_cutoffs = [pd.Timestamp('2020-07-07 13:00:00'), + pd.Timestamp('2020-07-07 14:00:00'), + pd.Timestamp('2020-07-07 15:00:00'), + pd.Timestamp('2020-07-07 16:00:00')] + cutoffs = generate_custom_cutoffs(df, horizon=7, unit="H", split_cutoff=pd.Timestamp('2020-07-07 13:00:00')) + self.assertEqual(expected_cutoffs, cutoffs) + + def test_generate_custom_cutoffs_success_daily(self): + df = pd.DataFrame( + pd.date_range(start="2020-07-01", end="2020-08-30", freq='d'), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_custom_cutoffs(df, horizon=7, unit="D", split_cutoff=pd.Timestamp('2020-08-21 00:00:00')) + self.assertEqual([pd.Timestamp('2020-08-21 00:00:00'), pd.Timestamp('2020-08-22 00:00:00'), pd.Timestamp('2020-08-23 00:00:00')], cutoffs) + + def test_generate_custom_cutoffs_success_small_horizon(self): + df = pd.DataFrame( + pd.date_range(start="2020-07-01", end="2020-08-30", freq='2d'), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_custom_cutoffs(df, horizon=1, unit="D", split_cutoff=pd.Timestamp('2020-08-26 00:00:00')) + self.assertEqual([pd.Timestamp('2020-08-27 00:00:00'), pd.Timestamp('2020-08-29 00:00:00')], cutoffs) + + def test_generate_custom_cutoffs_success_weekly(self): + df = pd.DataFrame( + pd.date_range(start="2020-07-01", periods=52, freq='W'), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_custom_cutoffs(df, horizon=7, unit="W", split_cutoff=pd.Timestamp('2021-04-25 00:00:00')) + self.assertEqual([pd.Timestamp('2021-04-25 00:00:00'), pd.Timestamp('2021-05-02 00:00:00'), pd.Timestamp('2021-05-09 00:00:00')], cutoffs) + + def test_generate_custom_cutoffs_success_monthly(self): + df = pd.DataFrame( + pd.date_range(start="2020-01-12", periods=24, freq=pd.DateOffset(months=1)), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_custom_cutoffs(df, horizon=7, unit="MS", split_cutoff=pd.Timestamp('2021-03-12 00:00:00')) + self.assertEqual([pd.Timestamp('2021-03-12 00:00:00'), pd.Timestamp('2021-04-12 00:00:00'), pd.Timestamp('2021-05-12 00:00:00')], cutoffs) + + def test_generate_custom_cutoffs_success_quaterly(self): + df = pd.DataFrame( + pd.date_range(start="2020-07-12", periods=9, freq=pd.DateOffset(months=3)), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_custom_cutoffs(df, horizon=7, unit="QS", split_cutoff=pd.Timestamp('2020-07-12 00:00:00')) + self.assertEqual([pd.Timestamp('2020-07-12 00:00:00'), pd.Timestamp('2020-10-12 00:00:00')], cutoffs) + + def test_generate_custom_cutoffs_success_annualy(self): + df = pd.DataFrame( + pd.date_range(start="2012-07-14", periods=10, freq=pd.DateOffset(years=1)), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_custom_cutoffs(df, horizon=7, unit="YS", split_cutoff=pd.Timestamp('2012-07-14 00:00:00')) + self.assertEqual([pd.Timestamp('2012-07-14 00:00:00'), pd.Timestamp('2013-07-14 00:00:00'), pd.Timestamp('2014-07-14 00:00:00')], cutoffs) + + def test_generate_custom_cutoffs_success_with_small_gaps(self): + df = pd.DataFrame( + pd.date_range(start="2020-07-01", periods=30, freq='3d'), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_custom_cutoffs(df, horizon=7, unit="D", split_cutoff=pd.Timestamp('2020-09-17 00:00:00')) + self.assertEqual([pd.Timestamp('2020-09-17 00:00:00'), + pd.Timestamp('2020-09-18 00:00:00'), + pd.Timestamp('2020-09-19 00:00:00')], cutoffs) + + def test_generate_custom_cutoffs_success_with_large_gaps(self): + df = pd.DataFrame( + pd.date_range(start="2020-07-01", periods=30, freq='9d'), columns=["ds"] + ).rename_axis("y").reset_index() + cutoffs = generate_custom_cutoffs(df, horizon=7, unit="D", split_cutoff=pd.Timestamp('2021-03-08 00:00:00')) + self.assertEqual([pd.Timestamp('2021-03-08 00:00:00'), + pd.Timestamp('2021-03-09 00:00:00'), + pd.Timestamp('2021-03-12 00:00:00')], cutoffs) + + class TestCalculatePeriodsAndFrequency(unittest.TestCase): def setUp(self) -> None: return super().setUp() From 0810c44618891b936a2451529681bbbde372f88d Mon Sep 17 00:00:00 2001 From: Ying Chen Date: Wed, 25 Sep 2024 13:01:05 -0700 Subject: [PATCH 02/11] [ML-44835] Add DeepAR mlflow model logging (#146) * Create mlflow pythonmodel for DeepAR * Fix predict bugs * Unit tests * Remove unused imports * Add dependencies to environment.txt * Fix torch version --- .../forecast/deepar/__init__.py | 0 .../automl_runtime/forecast/deepar/model.py | 137 ++++++++++++++++ runtime/databricks/automl_runtime/version.py | 2 +- runtime/environment.txt | 3 + runtime/requirements.txt | 3 + runtime/setup.py | 1 + .../forecast/deepar/__init__.py | 0 .../forecast/deepar/model_test.py | 154 ++++++++++++++++++ 8 files changed, 299 insertions(+), 1 deletion(-) create mode 100644 runtime/databricks/automl_runtime/forecast/deepar/__init__.py create mode 100644 runtime/databricks/automl_runtime/forecast/deepar/model.py create mode 100644 runtime/tests/automl_runtime/forecast/deepar/__init__.py create mode 100644 runtime/tests/automl_runtime/forecast/deepar/model_test.py diff --git a/runtime/databricks/automl_runtime/forecast/deepar/__init__.py b/runtime/databricks/automl_runtime/forecast/deepar/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/runtime/databricks/automl_runtime/forecast/deepar/model.py b/runtime/databricks/automl_runtime/forecast/deepar/model.py new file mode 100644 index 00000000..b831c90f --- /dev/null +++ b/runtime/databricks/automl_runtime/forecast/deepar/model.py @@ -0,0 +1,137 @@ +# +# Copyright (C) 2024 Databricks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from typing import List, Optional + +import gluonts +import mlflow +import pandas as pd +from gluonts.dataset.pandas import PandasDataset +from gluonts.torch.model.predictor import PyTorchPredictor +from mlflow.utils.environment import _mlflow_conda_env + +from databricks.automl_runtime.forecast.model import ForecastModel, mlflow_forecast_log_model + +DEEPAR_CONDA_ENV = _mlflow_conda_env( + additional_pip_deps=[ + f"gluonts[torch]=={gluonts.__version__}", + f"pandas=={pd.__version__}", + ] +) + + +class DeepARModel(ForecastModel): + """ + DeepAR mlflow model wrapper for forecasting. + """ + + def __init__(self, model: PyTorchPredictor, horizon: int, num_samples: int, + target_col: str, time_col: str, + id_cols: Optional[List[str]] = None) -> None: + """ + Initialize the DeepAR mlflow Python model wrapper + :param model: DeepAR model + :param horizon: the number of periods to forecast forward + :param num_samples: the number of samples to draw from the distribution + :param target_col: the target column name + :param time_col: the time column name + :param id_cols: the column names of the identity columns for multi-series time series; None for single series + """ + + # TODO: combine id_cols in predict() to ts_id when there are multiple id_cols + if id_cols and len(id_cols) > 1: + raise NotImplementedError("Logging multiple id_cols for DeepAR in AutoML are not supported yet") + + super().__init__() + self._model = model + self._horizon = horizon + self._num_samples = num_samples + self._target_col = target_col + self._time_col = time_col + self._id_cols = id_cols + + @property + def model_env(self): + return DEEPAR_CONDA_ENV + + def predict(self, + context: mlflow.pyfunc.model.PythonModelContext, + model_input: pd.DataFrame) -> pd.DataFrame: + """ + Predict the future dataframe given the history dataframe + :param context: A :class:`~PythonModelContext` instance containing artifacts that the model + can use to perform inference. + :param model_input: Input dataframe that contains the history data + :return: predicted pd.DataFrame that starts after the last timestamp in the input dataframe, + and predicts the horizon using the mean of the samples + """ + required_cols = [self._target_col, self._time_col] + if self._id_cols: + required_cols += self._id_cols + self._validate_cols(model_input, required_cols) + + forecast_sample_list = self.predict_samples(model_input, num_samples=self._num_samples) + + pred_df = pd.concat( + [ + forecast.mean_ts.rename('yhat').reset_index().assign(item_id=forecast.item_id) + for forecast in forecast_sample_list + ], + ignore_index=True + ) + + pred_df = pred_df.rename(columns={'index': self._time_col}) + if self._id_cols: + pred_df = pred_df.rename(columns={'item_id': self._id_cols[0]}) + else: + pred_df = pred_df.drop(columns='item_id') + + pred_df[self._time_col] = pred_df[self._time_col].dt.to_timestamp() + + return pred_df + + def predict_samples(self, + model_input: pd.DataFrame, + num_samples: int = None) -> List[gluonts.model.forecast.SampleForecast]: + """ + Predict the future samples given the history dataframe + :param model_input: Input dataframe that contains the history data + :param num_samples: the number of samples to draw from the distribution + :return: List of SampleForecast, where each SampleForecast contains num_samples sampled forecasts + """ + if num_samples is None: + num_samples = self._num_samples + + model_input = model_input.set_index(self._time_col) + if self._id_cols: + test_ds = PandasDataset.from_long_dataframe(model_input, target=self._target_col, + item_id=self._id_cols[0], unchecked=True) + else: + test_ds = PandasDataset(model_input, target=self._target_col) + + forecast_iter = self._model.predict(test_ds, num_samples=num_samples) + forecast_sample_list = list(forecast_iter) + + return forecast_sample_list + + +def mlflow_deepar_log_model(deepar_model: DeepARModel, + sample_input: pd.DataFrame = None) -> None: + """ + Log the DeepAR model to mlflow + :param deepar_model: DeepAR mlflow PythonModel wrapper + :param sample_input: sample input Dataframes for model inference + """ + mlflow_forecast_log_model(deepar_model, sample_input) diff --git a/runtime/databricks/automl_runtime/version.py b/runtime/databricks/automl_runtime/version.py index 574eeb2e..353f93e0 100644 --- a/runtime/databricks/automl_runtime/version.py +++ b/runtime/databricks/automl_runtime/version.py @@ -14,4 +14,4 @@ # limitations under the License. # -__version__ = "0.2.20.1" # pragma: no cover +__version__ = "0.2.20.2.dev0" # pragma: no cover diff --git a/runtime/environment.txt b/runtime/environment.txt index a52cc682..e286b7fb 100644 --- a/runtime/environment.txt +++ b/runtime/environment.txt @@ -2,8 +2,10 @@ # * Keep dependencies sorted. category_encoders==2.6.0 +gluonts[torch]==0.15.1 holidays==0.28 hyperopt==0.2.7 +lightning==2.0.1 mlflow==2.0.1 numpy==1.23.5 pandas==1.5.3 @@ -11,4 +13,5 @@ pmdarima==2.0.3 prophet==1.1.4 pyarrow==8.0.0 scikit-learn==1.1.1 +torch==2.0.1 wrapt==1.14.1 diff --git a/runtime/requirements.txt b/runtime/requirements.txt index c601c6de..d612c67b 100644 --- a/runtime/requirements.txt +++ b/runtime/requirements.txt @@ -2,6 +2,7 @@ # * Keep dependencies sorted. category_encoders +gluonts holidays hyperopt mlflow @@ -13,4 +14,6 @@ prophet pyarrow requests scikit-learn +torch +lightning wrapt diff --git a/runtime/setup.py b/runtime/setup.py index d379e94e..17f162c7 100644 --- a/runtime/setup.py +++ b/runtime/setup.py @@ -46,6 +46,7 @@ "databricks", "databricks.automl_runtime", "databricks.automl_runtime.forecast", + "databricks.automl_runtime.forecast.deepar", "databricks.automl_runtime.forecast.pmdarima", "databricks.automl_runtime.forecast.prophet", "databricks.automl_runtime.hyperopt", diff --git a/runtime/tests/automl_runtime/forecast/deepar/__init__.py b/runtime/tests/automl_runtime/forecast/deepar/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/runtime/tests/automl_runtime/forecast/deepar/model_test.py b/runtime/tests/automl_runtime/forecast/deepar/model_test.py new file mode 100644 index 00000000..f896688a --- /dev/null +++ b/runtime/tests/automl_runtime/forecast/deepar/model_test.py @@ -0,0 +1,154 @@ +# +# Copyright (C) 2024 Databricks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +import mlflow +import pandas as pd +import torch +import torch.nn as nn +from gluonts.dataset.field_names import FieldName +from gluonts.transform import InstanceSplitter, TestSplitSampler +from gluonts.torch.model.predictor import PyTorchPredictor + +from databricks.automl_runtime.forecast.deepar.model import ( + DeepARModel, mlflow_deepar_log_model, +) + + +class TestDeepARModel(unittest.TestCase): + @classmethod + def setUpClass(cls) -> None: + # Adapted from https://github.com/awslabs/gluonts/blob/dev/test/torch/model/test_torch_predictor.py + class RandomNetwork(nn.Module): + def __init__( + self, + prediction_length: int, + context_length: int, + ) -> None: + super().__init__() + self.prediction_length = prediction_length + self.context_length = context_length + self.net = nn.Linear(context_length, prediction_length) + torch.nn.init.uniform_(self.net.weight, -1.0, 1.0) + + def forward(self, past_target): + out = self.net(past_target.float()) + return out.unsqueeze(1) + + cls.context_length = 5 + cls.prediction_length = 5 + + cls.pred_net = RandomNetwork( + prediction_length=cls.context_length, context_length=cls.context_length + ) + + cls.transformation = InstanceSplitter( + target_field=FieldName.TARGET, + is_pad_field=FieldName.IS_PAD, + start_field=FieldName.START, + forecast_start_field=FieldName.FORECAST_START, + instance_sampler=TestSplitSampler(), + past_length=cls.context_length, + future_length=cls.prediction_length, + ) + + cls.model = PyTorchPredictor( + prediction_length=cls.prediction_length, + input_names=["past_target"], + prediction_net=cls.pred_net, + batch_size=16, + input_transform=cls.transformation, + device="cpu", + ) + + def test_model_save_and_load_single_series(self): + target_col = "sales" + time_col = "date" + + deepar_model = DeepARModel( + model=self.model, + horizon=self.prediction_length, + num_samples=1, + target_col=target_col, + time_col=time_col, + ) + + num_rows = 10 + sample_input = pd.concat( + [ + pd.to_datetime( + pd.Series(range(num_rows), name=time_col).apply( + lambda i: f"2020-10-{3 * i + 1}" + ) + ), + pd.Series(range(num_rows), name=target_col), + ], + axis=1, + ) + + with mlflow.start_run() as run: + mlflow_deepar_log_model(deepar_model, sample_input) + + run_id = run.info.run_id + loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") + + pred_df = loaded_model.predict(sample_input) + + assert pred_df.columns.tolist() == [time_col, "yhat"] + assert len(pred_df) == self.prediction_length + assert pred_df[time_col].min() > sample_input[time_col].max() + + def test_model_save_and_load_multi_series(self): + target_col = "sales" + time_col = "date" + id_col = "store" + + deepar_model = DeepARModel( + model=self.model, + horizon=self.prediction_length, + num_samples=1, + target_col=target_col, + time_col=time_col, + id_cols=[id_col], + ) + + num_rows = 10 + sample_input_base = pd.concat( + [ + pd.to_datetime( + pd.Series(range(num_rows), name=time_col).apply( + lambda i: f"2020-10-{3 * i + 1}" + ) + ), + pd.Series(range(num_rows), name=target_col), + ], + axis=1, + ) + sample_input = pd.concat([sample_input_base.copy(), sample_input_base.copy()], ignore_index=True) + sample_input[id_col] = [1] * num_rows + [2] * num_rows + + with mlflow.start_run() as run: + mlflow_deepar_log_model(deepar_model, sample_input) + + run_id = run.info.run_id + loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") + + pred_df = loaded_model.predict(sample_input) + + assert pred_df.columns.tolist() == [time_col, "yhat", id_col] + assert len(pred_df) == self.prediction_length * 2 + assert pred_df[time_col].min() > sample_input[time_col].max() From d04dd9ce967f91841da016e51946157e9c4b35fd Mon Sep 17 00:00:00 2001 From: Ying Chen Date: Fri, 27 Sep 2024 12:01:18 -0700 Subject: [PATCH 03/11] Remove self._target_col from required columns of DeepAR predict() (#147) --- runtime/databricks/automl_runtime/forecast/deepar/model.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/databricks/automl_runtime/forecast/deepar/model.py b/runtime/databricks/automl_runtime/forecast/deepar/model.py index b831c90f..feaaa8c9 100644 --- a/runtime/databricks/automl_runtime/forecast/deepar/model.py +++ b/runtime/databricks/automl_runtime/forecast/deepar/model.py @@ -77,7 +77,7 @@ def predict(self, :return: predicted pd.DataFrame that starts after the last timestamp in the input dataframe, and predicts the horizon using the mean of the samples """ - required_cols = [self._target_col, self._time_col] + required_cols = [self._time_col] if self._id_cols: required_cols += self._id_cols self._validate_cols(model_input, required_cols) From 4f165674a3c3911b72eed89b092c8864fdccd430 Mon Sep 17 00:00:00 2001 From: Ying Chen Date: Mon, 30 Sep 2024 13:59:25 -0700 Subject: [PATCH 04/11] Bump version 0.2.20.2.dev0 -> 0.2.20.2 (#149) --- runtime/databricks/automl_runtime/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/databricks/automl_runtime/version.py b/runtime/databricks/automl_runtime/version.py index 353f93e0..6ab0a0bc 100644 --- a/runtime/databricks/automl_runtime/version.py +++ b/runtime/databricks/automl_runtime/version.py @@ -14,4 +14,4 @@ # limitations under the License. # -__version__ = "0.2.20.2.dev0" # pragma: no cover +__version__ = "0.2.20.2" # pragma: no cover From f7dbeb34266ca41e8a9fa7d434de83e2f9235f99 Mon Sep 17 00:00:00 2001 From: Sabhya Chhabria Date: Tue, 1 Oct 2024 14:20:50 -0700 Subject: [PATCH 05/11] Add target col back to required cols in DeepAR predict() (#150) --- runtime/databricks/automl_runtime/forecast/deepar/model.py | 2 +- runtime/databricks/automl_runtime/version.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/runtime/databricks/automl_runtime/forecast/deepar/model.py b/runtime/databricks/automl_runtime/forecast/deepar/model.py index feaaa8c9..b831c90f 100644 --- a/runtime/databricks/automl_runtime/forecast/deepar/model.py +++ b/runtime/databricks/automl_runtime/forecast/deepar/model.py @@ -77,7 +77,7 @@ def predict(self, :return: predicted pd.DataFrame that starts after the last timestamp in the input dataframe, and predicts the horizon using the mean of the samples """ - required_cols = [self._time_col] + required_cols = [self._target_col, self._time_col] if self._id_cols: required_cols += self._id_cols self._validate_cols(model_input, required_cols) diff --git a/runtime/databricks/automl_runtime/version.py b/runtime/databricks/automl_runtime/version.py index 6ab0a0bc..bcb6931e 100644 --- a/runtime/databricks/automl_runtime/version.py +++ b/runtime/databricks/automl_runtime/version.py @@ -14,4 +14,4 @@ # limitations under the License. # -__version__ = "0.2.20.2" # pragma: no cover +__version__ = "0.2.20.3.dev" # pragma: no cover From f6de2019a48144108518f5fe079efafaa6c6c274 Mon Sep 17 00:00:00 2001 From: Sabhya Chhabria Date: Mon, 7 Oct 2024 10:55:22 -0700 Subject: [PATCH 06/11] Add databricks_automl to the conda env for arima and deepar (#151) * hardcode extra pip dependency * fix typo * add dep to arima and deepar * style fix * reqs test * test fix attemp 1 * deepar add dep test * prophet test updated * small rename * address nits * nit --- .../automl_runtime/forecast/deepar/model.py | 13 ++++--- .../automl_runtime/forecast/model.py | 10 ++++-- .../automl_runtime/forecast/pmdarima/model.py | 12 ++++--- .../automl_runtime/forecast/prophet/model.py | 6 ++-- .../forecast/deepar/model_test.py | 25 +++++++++++-- .../forecast/pmdarima/model_test.py | 28 +++++++++++++-- .../forecast/prophet/model_test.py | 36 +++++++++++++++---- 7 files changed, 106 insertions(+), 24 deletions(-) diff --git a/runtime/databricks/automl_runtime/forecast/deepar/model.py b/runtime/databricks/automl_runtime/forecast/deepar/model.py index b831c90f..cb26d1ba 100644 --- a/runtime/databricks/automl_runtime/forecast/deepar/model.py +++ b/runtime/databricks/automl_runtime/forecast/deepar/model.py @@ -23,12 +23,17 @@ from mlflow.utils.environment import _mlflow_conda_env from databricks.automl_runtime.forecast.model import ForecastModel, mlflow_forecast_log_model +from databricks.automl_runtime import version + + +DEEPAR_ADDITIONAL_PIP_DEPS = [ + f"gluonts[torch]=={gluonts.__version__}", + f"pandas=={pd.__version__}", + f"databricks-automl-runtime=={version.__version__}" +] DEEPAR_CONDA_ENV = _mlflow_conda_env( - additional_pip_deps=[ - f"gluonts[torch]=={gluonts.__version__}", - f"pandas=={pd.__version__}", - ] + additional_pip_deps=DEEPAR_ADDITIONAL_PIP_DEPS ) diff --git a/runtime/databricks/automl_runtime/forecast/model.py b/runtime/databricks/automl_runtime/forecast/model.py index 2ba5fe66..4b1aa25c 100644 --- a/runtime/databricks/automl_runtime/forecast/model.py +++ b/runtime/databricks/automl_runtime/forecast/model.py @@ -57,10 +57,14 @@ def mlflow_forecast_log_model(forecast_model: ForecastModel, :param forecast_model: Forecast model wrapper :param sample_input: sample input Dataframes for model inference """ - # log the model without signature if infer_signature is failed. + # TODO: [ML-46185] we should not be logging without a signature since it cannot be registered to UC then try: signature = forecast_model.infer_signature(sample_input) except Exception: # noqa signature = None - mlflow.pyfunc.log_model("model", conda_env=forecast_model.model_env, - python_model=forecast_model, signature=signature) + mlflow.pyfunc.log_model( + artifact_path="model", + conda_env=forecast_model.model_env, + python_model=forecast_model, + signature=signature + ) diff --git a/runtime/databricks/automl_runtime/forecast/pmdarima/model.py b/runtime/databricks/automl_runtime/forecast/pmdarima/model.py index c99d5d01..64e5377d 100644 --- a/runtime/databricks/automl_runtime/forecast/pmdarima/model.py +++ b/runtime/databricks/automl_runtime/forecast/pmdarima/model.py @@ -28,13 +28,17 @@ from databricks.automl_runtime.forecast.model import ForecastModel, mlflow_forecast_log_model from databricks.automl_runtime.forecast.utils import calculate_period_differences, is_frequency_consistency, \ make_future_dataframe, make_single_future_dataframe +from databricks.automl_runtime import version +ARIMA_ADDITIONAL_PIP_DEPS = [ + f"pmdarima=={pmdarima.__version__}", + f"pandas=={pd.__version__}", + f"databricks-automl-runtime=={version.__version__}" +] + ARIMA_CONDA_ENV = _mlflow_conda_env( - additional_pip_deps=[ - f"pmdarima=={pmdarima.__version__}", - f"pandas=={pd.__version__}", - ] + additional_pip_deps=ARIMA_ADDITIONAL_PIP_DEPS ) diff --git a/runtime/databricks/automl_runtime/forecast/prophet/model.py b/runtime/databricks/automl_runtime/forecast/prophet/model.py index ed155a3f..f678ce6b 100644 --- a/runtime/databricks/automl_runtime/forecast/prophet/model.py +++ b/runtime/databricks/automl_runtime/forecast/prophet/model.py @@ -29,12 +29,14 @@ from databricks.automl_runtime.forecast.utils import is_quaterly_alias, make_future_dataframe -PROPHET_CONDA_ENV = _mlflow_conda_env( - additional_pip_deps=[ +PROPHET_ADDITIONAL_PIP_DEPS = [ f"prophet=={prophet.__version__}", f"cloudpickle=={cloudpickle.__version__}", f"databricks-automl-runtime=={version.__version__}", ] + +PROPHET_CONDA_ENV = _mlflow_conda_env( + additional_pip_deps=PROPHET_ADDITIONAL_PIP_DEPS ) diff --git a/runtime/tests/automl_runtime/forecast/deepar/model_test.py b/runtime/tests/automl_runtime/forecast/deepar/model_test.py index f896688a..bf9ac93a 100644 --- a/runtime/tests/automl_runtime/forecast/deepar/model_test.py +++ b/runtime/tests/automl_runtime/forecast/deepar/model_test.py @@ -25,7 +25,9 @@ from gluonts.torch.model.predictor import PyTorchPredictor from databricks.automl_runtime.forecast.deepar.model import ( - DeepARModel, mlflow_deepar_log_model, + DeepARModel, + mlflow_deepar_log_model, + DEEPAR_ADDITIONAL_PIP_DEPS ) @@ -104,8 +106,12 @@ def test_model_save_and_load_single_series(self): mlflow_deepar_log_model(deepar_model, sample_input) run_id = run.info.run_id - loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") + # check if all additional dependencies are logged + self._check_requirements(run_id) + + # load the model and predict + loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") pred_df = loaded_model.predict(sample_input) assert pred_df.columns.tolist() == [time_col, "yhat"] @@ -145,10 +151,23 @@ def test_model_save_and_load_multi_series(self): mlflow_deepar_log_model(deepar_model, sample_input) run_id = run.info.run_id - loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") + # check if all additional dependencies are logged + self._check_requirements(run_id) + + # load the model and predict + loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") pred_df = loaded_model.predict(sample_input) assert pred_df.columns.tolist() == [time_col, "yhat", id_col] assert len(pred_df) == self.prediction_length * 2 assert pred_df[time_col].min() > sample_input[time_col].max() + + def _check_requirements(self, run_id: str): + # read requirements.txt from the run + requirements_path = mlflow.artifacts.download_artifacts(f"runs:/{run_id}/model/requirements.txt") + with open(requirements_path, "r") as f: + requirements = f.read() + # check if all additional dependencies are logged + for dependency in DEEPAR_ADDITIONAL_PIP_DEPS: + self.assertIn(dependency, requirements, f"requirements.txt should contain {dependency} but got {requirements}") diff --git a/runtime/tests/automl_runtime/forecast/pmdarima/model_test.py b/runtime/tests/automl_runtime/forecast/pmdarima/model_test.py index f7616787..3c6d0d40 100644 --- a/runtime/tests/automl_runtime/forecast/pmdarima/model_test.py +++ b/runtime/tests/automl_runtime/forecast/pmdarima/model_test.py @@ -25,8 +25,13 @@ from mlflow.protos.databricks_pb2 import ErrorCode, INVALID_PARAMETER_VALUE from pmdarima.arima import ARIMA -from databricks.automl_runtime.forecast.pmdarima.model import ArimaModel, MultiSeriesArimaModel, AbstractArimaModel, \ - mlflow_arima_log_model +from databricks.automl_runtime.forecast.pmdarima.model import ( + ArimaModel, + MultiSeriesArimaModel, + AbstractArimaModel, + mlflow_arima_log_model, + ARIMA_ADDITIONAL_PIP_DEPS, +) class TestArimaModel(unittest.TestCase): @@ -438,6 +443,11 @@ def test_mlflow_arima_log_model(self): # Load the saved model from mlflow run_id = run.info.run_id + + # Check additonal requirements logged correctly + self._check_requirements(run_id) + + # Load the model loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") # Make sure can make forecasts with the saved model @@ -460,6 +470,11 @@ def test_mlflow_arima_log_model_multiseries(self): # Load the saved model from mlflow run_id = run.info.run_id + + # Check additonal requirements logged correctly + self._check_requirements(run_id) + + # Load the model loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") # Make sure can make forecasts with the saved model @@ -473,3 +488,12 @@ def test_mlflow_arima_log_model_multiseries(self): # Make sure can make forecasts for one-row dataframe loaded_model.predict(test_df[0:1]) + + def _check_requirements(self, run_id: str): + # read requirements.txt from the run + requirements_path = mlflow.artifacts.download_artifacts(f"runs:/{run_id}/model/requirements.txt") + with open(requirements_path, "r") as f: + requirements = f.read() + # check if all additional dependencies are logged + for dependency in ARIMA_ADDITIONAL_PIP_DEPS: + self.assertIn(dependency, requirements, f"requirements.txt should contain {dependency} but got {requirements}") diff --git a/runtime/tests/automl_runtime/forecast/prophet/model_test.py b/runtime/tests/automl_runtime/forecast/prophet/model_test.py index a4b321a8..4ce65dae 100644 --- a/runtime/tests/automl_runtime/forecast/prophet/model_test.py +++ b/runtime/tests/automl_runtime/forecast/prophet/model_test.py @@ -32,12 +32,22 @@ ProphetModel, OFFSET_ALIAS_MAP, DATE_OFFSET_KEYWORD_MAP, + PROPHET_ADDITIONAL_PIP_DEPS ) PROPHET_MODEL_JSON = '{"growth": "linear", "n_changepoints": 6, "specified_changepoints": false, "changepoint_range": 0.8, "yearly_seasonality": "auto", "weekly_seasonality": "auto", "daily_seasonality": "auto", "seasonality_mode": "additive", "seasonality_prior_scale": 10.0, "changepoint_prior_scale": 0.05, "holidays_prior_scale": 10.0, "mcmc_samples": 0, "interval_width": 0.8, "uncertainty_samples": 1000, "y_scale": 8.0, "logistic_floor": false, "country_holidays": null, "component_modes": {"additive": ["weekly", "additive_terms", "extra_regressors_additive", "holidays"], "multiplicative": ["multiplicative_terms", "extra_regressors_multiplicative"]}, "changepoints": "{\\"name\\":\\"ds\\",\\"index\\":[1,2,3,4,5,6],\\"data\\":[\\"2020-10-04T00:00:00.000\\",\\"2020-10-07T00:00:00.000\\",\\"2020-10-10T00:00:00.000\\",\\"2020-10-13T00:00:00.000\\",\\"2020-10-16T00:00:00.000\\",\\"2020-10-19T00:00:00.000\\"]}", "history_dates": "{\\"name\\":\\"ds\\",\\"index\\":[0,1,2,3,4,5,6,7,8],\\"data\\":[\\"2020-10-01T00:00:00.000\\",\\"2020-10-04T00:00:00.000\\",\\"2020-10-07T00:00:00.000\\",\\"2020-10-10T00:00:00.000\\",\\"2020-10-13T00:00:00.000\\",\\"2020-10-16T00:00:00.000\\",\\"2020-10-19T00:00:00.000\\",\\"2020-10-22T00:00:00.000\\",\\"2020-10-25T00:00:00.000\\"]}", "train_holiday_names": null, "start": 1601510400.0, "t_scale": 2073600.0, "holidays": null, "history": "{\\"schema\\":{\\"fields\\":[{\\"name\\":\\"ds\\",\\"type\\":\\"datetime\\"},{\\"name\\":\\"y\\",\\"type\\":\\"integer\\"},{\\"name\\":\\"floor\\",\\"type\\":\\"integer\\"},{\\"name\\":\\"t\\",\\"type\\":\\"number\\"},{\\"name\\":\\"y_scaled\\",\\"type\\":\\"number\\"}],\\"pandas_version\\":\\"1.4.0\\"},\\"data\\":[{\\"ds\\":\\"2020-10-01T00:00:00.000\\",\\"y\\":0,\\"floor\\":0,\\"t\\":0.0,\\"y_scaled\\":0.0},{\\"ds\\":\\"2020-10-04T00:00:00.000\\",\\"y\\":1,\\"floor\\":0,\\"t\\":0.125,\\"y_scaled\\":0.125},{\\"ds\\":\\"2020-10-07T00:00:00.000\\",\\"y\\":2,\\"floor\\":0,\\"t\\":0.25,\\"y_scaled\\":0.25},{\\"ds\\":\\"2020-10-10T00:00:00.000\\",\\"y\\":3,\\"floor\\":0,\\"t\\":0.375,\\"y_scaled\\":0.375},{\\"ds\\":\\"2020-10-13T00:00:00.000\\",\\"y\\":4,\\"floor\\":0,\\"t\\":0.5,\\"y_scaled\\":0.5},{\\"ds\\":\\"2020-10-16T00:00:00.000\\",\\"y\\":5,\\"floor\\":0,\\"t\\":0.625,\\"y_scaled\\":0.625},{\\"ds\\":\\"2020-10-19T00:00:00.000\\",\\"y\\":6,\\"floor\\":0,\\"t\\":0.75,\\"y_scaled\\":0.75},{\\"ds\\":\\"2020-10-22T00:00:00.000\\",\\"y\\":7,\\"floor\\":0,\\"t\\":0.875,\\"y_scaled\\":0.875},{\\"ds\\":\\"2020-10-25T00:00:00.000\\",\\"y\\":8,\\"floor\\":0,\\"t\\":1.0,\\"y_scaled\\":1.0}]}", "train_component_cols": "{\\"schema\\":{\\"fields\\":[{\\"name\\":\\"additive_terms\\",\\"type\\":\\"integer\\"},{\\"name\\":\\"weekly\\",\\"type\\":\\"integer\\"},{\\"name\\":\\"multiplicative_terms\\",\\"type\\":\\"integer\\"}],\\"pandas_version\\":\\"1.4.0\\"},\\"data\\":[{\\"additive_terms\\":1,\\"weekly\\":1,\\"multiplicative_terms\\":0},{\\"additive_terms\\":1,\\"weekly\\":1,\\"multiplicative_terms\\":0},{\\"additive_terms\\":1,\\"weekly\\":1,\\"multiplicative_terms\\":0},{\\"additive_terms\\":1,\\"weekly\\":1,\\"multiplicative_terms\\":0},{\\"additive_terms\\":1,\\"weekly\\":1,\\"multiplicative_terms\\":0},{\\"additive_terms\\":1,\\"weekly\\":1,\\"multiplicative_terms\\":0}]}", "changepoints_t": [0.125, 0.25, 0.375, 0.5, 0.625, 0.75], "seasonalities": [["weekly"], {"weekly": {"period": 7, "fourier_order": 3, "prior_scale": 10.0, "mode": "additive", "condition_name": null}}], "extra_regressors": [[], {}], "fit_kwargs": {}, "params": {"lp__": [[202.053]], "k": [[1.19777]], "m": [[0.0565623]], "delta": [[-0.86152, 0.409957, -0.103241, 0.528979, 0.535181, -0.509356]], "sigma_obs": [[2.53056e-13]], "beta": [[-0.00630566, 0.016248, 0.0318587, -0.068705, 0.0029986, -0.00410522]], "trend": [[0.0565623, 0.206283, 0.248314, 0.341589, 0.421959, 0.568452, 0.781842, 0.931562, 1.08128]]}, "__prophet_version": "1.1.1"}' - -class TestProphetModel(unittest.TestCase): +class BaseProphetModelTest(unittest.TestCase): + def _check_requirements(self, run_id: str): + # read requirements.txt from the run + requirements_path = mlflow.artifacts.download_artifacts(f"runs:/{run_id}/model/requirements.txt") + with open(requirements_path, "r") as f: + requirements = f.read() + # check if all additional dependencies are logged + for dependency in PROPHET_ADDITIONAL_PIP_DEPS: + self.assertIn(dependency, requirements, f"requirements.txt should contain {dependency} but got {requirements}") + +class TestProphetModel(BaseProphetModelTest): @classmethod def setUpClass(cls) -> None: num_rows = 9 @@ -74,8 +84,13 @@ def test_model_save_and_load(self): with mlflow.start_run() as run: mlflow_prophet_log_model(prophet_model) - # Load the saved model from mlflow + run_id = run.info.run_id + + # Check additonal requirements logged correctly + self._check_requirements(run_id) + + # Load the saved model from mlflow prophet_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") # Check the prediction with the saved model @@ -144,7 +159,7 @@ def test_validate_predict_cols(self): assert e.value.error_code == ErrorCode.Name(INTERNAL_ERROR) -class TestMultiSeriesProphetModel(unittest.TestCase): +class TestMultiSeriesProphetModel(BaseProphetModelTest): @classmethod def setUpClass(cls) -> None: cls.model_json = PROPHET_MODEL_JSON @@ -178,8 +193,13 @@ def test_model_save_and_load(self): with mlflow.start_run() as run: mlflow_prophet_log_model(self.prophet_model, sample_input=test_df) - # Load the saved model from mlflow + run_id = run.info.run_id + + # Check additonal requirements logged correctly + self._check_requirements(run_id) + + # Load the saved model from mlflow loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") # Check the prediction with the saved model @@ -239,9 +259,13 @@ def test_model_save_and_load_multi_ids(self): ) with mlflow.start_run() as run: mlflow_prophet_log_model(prophet_model, sample_input=test_df) + + run_id = run.info.run_id + + # Check additonal requirements logged correctly + self._check_requirements(run_id) # Load the saved model from mlflow - run_id = run.info.run_id loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") # Check the prediction with the saved model From f803112485f503554a21fad33609ad8f0f96b13e Mon Sep 17 00:00:00 2001 From: Ying Chen Date: Wed, 9 Oct 2024 11:19:46 -0700 Subject: [PATCH 07/11] Fill time steps for single time-series in DeepAR predict (#152) * Fill missing time steps in DeepAR predict * Add single series test * Fix multi series missing time steps and add test * nit * Support multiple id_cols and add test case * Bump version * docstring * Address comments and add test --- .../automl_runtime/forecast/deepar/model.py | 28 ++--- .../automl_runtime/forecast/deepar/utils.py | 55 +++++++++ runtime/databricks/automl_runtime/version.py | 2 +- .../forecast/deepar/model_test.py | 67 +++++++++-- .../forecast/deepar/utils_test.py | 110 ++++++++++++++++++ 5 files changed, 237 insertions(+), 25 deletions(-) create mode 100644 runtime/databricks/automl_runtime/forecast/deepar/utils.py create mode 100644 runtime/tests/automl_runtime/forecast/deepar/utils_test.py diff --git a/runtime/databricks/automl_runtime/forecast/deepar/model.py b/runtime/databricks/automl_runtime/forecast/deepar/model.py index cb26d1ba..3af2e842 100644 --- a/runtime/databricks/automl_runtime/forecast/deepar/model.py +++ b/runtime/databricks/automl_runtime/forecast/deepar/model.py @@ -22,9 +22,9 @@ from gluonts.torch.model.predictor import PyTorchPredictor from mlflow.utils.environment import _mlflow_conda_env -from databricks.automl_runtime.forecast.model import ForecastModel, mlflow_forecast_log_model from databricks.automl_runtime import version - +from databricks.automl_runtime.forecast.model import ForecastModel, mlflow_forecast_log_model +from databricks.automl_runtime.forecast.deepar.utils import set_index_and_fill_missing_time_steps DEEPAR_ADDITIONAL_PIP_DEPS = [ f"gluonts[torch]=={gluonts.__version__}", @@ -42,26 +42,25 @@ class DeepARModel(ForecastModel): DeepAR mlflow model wrapper for forecasting. """ - def __init__(self, model: PyTorchPredictor, horizon: int, num_samples: int, + def __init__(self, model: PyTorchPredictor, horizon: int, frequency: str, + num_samples: int, target_col: str, time_col: str, id_cols: Optional[List[str]] = None) -> None: """ Initialize the DeepAR mlflow Python model wrapper :param model: DeepAR model :param horizon: the number of periods to forecast forward + :param frequency: the frequency of the time series :param num_samples: the number of samples to draw from the distribution :param target_col: the target column name :param time_col: the time column name :param id_cols: the column names of the identity columns for multi-series time series; None for single series """ - # TODO: combine id_cols in predict() to ts_id when there are multiple id_cols - if id_cols and len(id_cols) > 1: - raise NotImplementedError("Logging multiple id_cols for DeepAR in AutoML are not supported yet") - super().__init__() self._model = model self._horizon = horizon + self._frequency = frequency self._num_samples = num_samples self._target_col = target_col self._time_col = time_col @@ -99,7 +98,8 @@ def predict(self, pred_df = pred_df.rename(columns={'index': self._time_col}) if self._id_cols: - pred_df = pred_df.rename(columns={'item_id': self._id_cols[0]}) + id_col_name = '-'.join(self._id_cols) + pred_df = pred_df.rename(columns={'item_id': id_col_name}) else: pred_df = pred_df.drop(columns='item_id') @@ -119,12 +119,12 @@ def predict_samples(self, if num_samples is None: num_samples = self._num_samples - model_input = model_input.set_index(self._time_col) - if self._id_cols: - test_ds = PandasDataset.from_long_dataframe(model_input, target=self._target_col, - item_id=self._id_cols[0], unchecked=True) - else: - test_ds = PandasDataset(model_input, target=self._target_col) + model_input_transformed = set_index_and_fill_missing_time_steps(model_input, + self._time_col, + self._frequency, + self._id_cols) + + test_ds = PandasDataset(model_input_transformed, target=self._target_col) forecast_iter = self._model.predict(test_ds, num_samples=num_samples) forecast_sample_list = list(forecast_iter) diff --git a/runtime/databricks/automl_runtime/forecast/deepar/utils.py b/runtime/databricks/automl_runtime/forecast/deepar/utils.py new file mode 100644 index 00000000..b69c83e2 --- /dev/null +++ b/runtime/databricks/automl_runtime/forecast/deepar/utils.py @@ -0,0 +1,55 @@ +# +# Copyright (C) 2024 Databricks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from typing import List, Optional + +import pandas as pd + + +def set_index_and_fill_missing_time_steps(df: pd.DataFrame, time_col: str, + frequency: str, + id_cols: Optional[List[str]] = None): + """ + Transform the input dataframe to an acceptable format for the GluonTS library. + + - Set the time column as the index + - Impute missing time steps between the min and max time steps + + :param df: the input dataframe that contains time_col + :param time_col: time column name + :param frequency: the frequency of the time series + :param id_cols: the column names of the identity columns for multi-series time series; None for single series + :return: single-series - transformed dataframe; + multi-series - dictionary of transformed dataframes, each key is the (concatenated) id of the time series + """ + total_min, total_max = df[time_col].min(), df[time_col].max() + new_index_full = pd.date_range(total_min, total_max, freq=frequency) + + if id_cols is not None: + df_dict = {} + for grouped_id, grouped_df in df.groupby(id_cols): + if isinstance(grouped_id, tuple): + ts_id = "-".join([str(x) for x in grouped_id]) + else: + ts_id = str(grouped_id) + df_dict[ts_id] = (grouped_df.set_index(time_col).sort_index() + .reindex(new_index_full).drop(id_cols, axis=1)) + + return df_dict + + df = df.set_index(time_col).sort_index() + + # Fill in missing time steps between the min and max time steps + return df.reindex(new_index_full) diff --git a/runtime/databricks/automl_runtime/version.py b/runtime/databricks/automl_runtime/version.py index bcb6931e..2d1bf413 100644 --- a/runtime/databricks/automl_runtime/version.py +++ b/runtime/databricks/automl_runtime/version.py @@ -14,4 +14,4 @@ # limitations under the License. # -__version__ = "0.2.20.3.dev" # pragma: no cover +__version__ = "0.2.20.3" # pragma: no cover diff --git a/runtime/tests/automl_runtime/forecast/deepar/model_test.py b/runtime/tests/automl_runtime/forecast/deepar/model_test.py index bf9ac93a..1d3ed3e3 100644 --- a/runtime/tests/automl_runtime/forecast/deepar/model_test.py +++ b/runtime/tests/automl_runtime/forecast/deepar/model_test.py @@ -84,6 +84,7 @@ def test_model_save_and_load_single_series(self): deepar_model = DeepARModel( model=self.model, horizon=self.prediction_length, + frequency="d", num_samples=1, target_col=target_col, time_col=time_col, @@ -114,9 +115,9 @@ def test_model_save_and_load_single_series(self): loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") pred_df = loaded_model.predict(sample_input) - assert pred_df.columns.tolist() == [time_col, "yhat"] - assert len(pred_df) == self.prediction_length - assert pred_df[time_col].min() > sample_input[time_col].max() + self.assertEqual(pred_df.columns.tolist(), [time_col, "yhat"]) + self.assertEqual(len(pred_df), self.prediction_length) + self.assertGreater(pred_df[time_col].min(), sample_input[time_col].max()) def test_model_save_and_load_multi_series(self): target_col = "sales" @@ -127,25 +128,26 @@ def test_model_save_and_load_multi_series(self): model=self.model, horizon=self.prediction_length, num_samples=1, + frequency="d", target_col=target_col, time_col=time_col, id_cols=[id_col], ) - num_rows = 10 + num_rows_per_ts = 10 sample_input_base = pd.concat( [ pd.to_datetime( - pd.Series(range(num_rows), name=time_col).apply( + pd.Series(range(num_rows_per_ts), name=time_col).apply( lambda i: f"2020-10-{3 * i + 1}" ) ), - pd.Series(range(num_rows), name=target_col), + pd.Series(range(num_rows_per_ts), name=target_col), ], axis=1, ) sample_input = pd.concat([sample_input_base.copy(), sample_input_base.copy()], ignore_index=True) - sample_input[id_col] = [1] * num_rows + [2] * num_rows + sample_input[id_col] = [1] * num_rows_per_ts + [2] * num_rows_per_ts with mlflow.start_run() as run: mlflow_deepar_log_model(deepar_model, sample_input) @@ -155,13 +157,58 @@ def test_model_save_and_load_multi_series(self): # check if all additional dependencies are logged self._check_requirements(run_id) + # load the model and predict + loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") + + pred_df = loaded_model.predict(sample_input) + + self.assertEqual(pred_df.columns.tolist(), [time_col, "yhat", id_col]) + self.assertEqual(len(pred_df), self.prediction_length * 2) + self.assertGreater(pred_df[time_col].min(), sample_input[time_col].max()) + + def test_model_save_and_load_multi_series_multi_id_cols(self): + target_col = "sales" + time_col = "date" + id_cols = ["store", "dept"] + + deepar_model = DeepARModel( + model=self.model, + horizon=self.prediction_length, + num_samples=1, + frequency="d", + target_col=target_col, + time_col=time_col, + id_cols=id_cols, + ) + + num_rows_per_ts = 5 + sample_input_base = pd.concat( + [ + pd.to_datetime( + pd.Series(range(num_rows_per_ts), name=time_col).apply( + lambda i: f"2020-10-{3 * i + 1}" + ) + ), + pd.Series(range(num_rows_per_ts), name=target_col), + ], + axis=1, + ) + sample_input = pd.concat([sample_input_base.copy(), sample_input_base.copy(), + sample_input_base.copy(), sample_input_base.copy(),], ignore_index=True) + sample_input[id_cols[0]] = ['A'] * (2 * num_rows_per_ts) + ['B'] * (2 * num_rows_per_ts) + sample_input[id_cols[1]] = (['X'] * num_rows_per_ts + ['Y'] * num_rows_per_ts) * 2 + + with mlflow.start_run() as run: + mlflow_deepar_log_model(deepar_model, sample_input) + run_id = run.info.run_id + # load the model and predict loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") pred_df = loaded_model.predict(sample_input) - assert pred_df.columns.tolist() == [time_col, "yhat", id_col] - assert len(pred_df) == self.prediction_length * 2 - assert pred_df[time_col].min() > sample_input[time_col].max() + self.assertEqual(pred_df.columns.tolist(), [time_col, "yhat", "store-dept"]) + self.assertEqual(len(pred_df), self.prediction_length * 4) + self.assertGreater(pred_df[time_col].min(), sample_input[time_col].max()) def _check_requirements(self, run_id: str): # read requirements.txt from the run diff --git a/runtime/tests/automl_runtime/forecast/deepar/utils_test.py b/runtime/tests/automl_runtime/forecast/deepar/utils_test.py new file mode 100644 index 00000000..b3538781 --- /dev/null +++ b/runtime/tests/automl_runtime/forecast/deepar/utils_test.py @@ -0,0 +1,110 @@ +# +# Copyright (C) 2024 Databricks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest + +import pandas as pd + +from databricks.automl_runtime.forecast.deepar.utils import set_index_and_fill_missing_time_steps + + +class TestDeepARUtils(unittest.TestCase): + def test_single_series_filled(self): + target_col = "sales" + time_col = "date" + num_rows = 10 + + base_df = pd.concat( + [ + pd.to_datetime( + pd.Series(range(num_rows), name=time_col).apply( + lambda i: f"2020-10-{i + 1}" + ) + ), + pd.Series(range(num_rows), name=target_col), + ], + axis=1, + ) + dropped_df = base_df.drop([4, 5]).reset_index(drop=True) + + transformed_df = set_index_and_fill_missing_time_steps(dropped_df, time_col, "D") + + expected_df = base_df.copy() + expected_df.loc[[4, 5], target_col] = float('nan') + expected_df = expected_df.set_index(time_col).rename_axis(None).asfreq("D") + + pd.testing.assert_frame_equal(transformed_df, expected_df) + + def test_multi_series_filled(self): + target_col = "sales" + time_col = "date" + id_col = "store" + + num_rows_per_ts = 10 + base_df = pd.concat( + [ + pd.to_datetime( + pd.Series(range(num_rows_per_ts), name=time_col).apply( + lambda i: f"2020-10-{i + 1}" + ) + ), + pd.Series(range(num_rows_per_ts), name=target_col), + ], + axis=1, + ) + dropped_base_df = base_df.drop([4, 5]).reset_index(drop=True) + dropped_df = pd.concat([dropped_base_df.copy(), dropped_base_df.copy()], ignore_index=True) + dropped_df[id_col] = [1] * (num_rows_per_ts - 2) + [2] * (num_rows_per_ts - 2) + + transformed_df_dict = set_index_and_fill_missing_time_steps(dropped_df, time_col, "D", id_cols=[id_col]) + self.assertEqual(transformed_df_dict.keys(), {"1", "2"}) + + expected_first_df = base_df.copy() + expected_first_df.loc[[4, 5], target_col] = float('nan') + expected_first_df = expected_first_df.set_index(time_col).rename_axis(None).asfreq("D") + + pd.testing.assert_frame_equal(transformed_df_dict["1"], expected_first_df) + + def test_multi_series_multi_id_cols_filled(self): + target_col = "sales" + time_col = "date" + id_cols = ["store", "dept"] + + num_rows_per_ts = 10 + base_df = pd.concat( + [ + pd.to_datetime( + pd.Series(range(num_rows_per_ts), name=time_col).apply( + lambda i: f"2020-10-{i + 1}" + ) + ), + pd.Series(range(num_rows_per_ts), name=target_col), + ], + axis=1, + ) + dropped_base_df = base_df.drop([4, 5]).reset_index(drop=True) + dropped_df = pd.concat([dropped_base_df.copy(), dropped_base_df.copy(), + dropped_base_df.copy(), dropped_base_df.copy()], ignore_index=True) + dropped_df[id_cols[0]] = ([1] * (num_rows_per_ts - 2) + [2] * (num_rows_per_ts - 2)) * 2 + dropped_df[id_cols[1]] = [1] * (2 * (num_rows_per_ts - 2)) + [2] * (2 * (num_rows_per_ts - 2)) + + transformed_df_dict = set_index_and_fill_missing_time_steps(dropped_df, time_col, "D", id_cols=id_cols) + self.assertEqual(transformed_df_dict.keys(), {"1-1", "1-2", "2-1", "2-2"}) + + expected_first_df = base_df.copy() + expected_first_df.loc[[4, 5], target_col] = float('nan') + expected_first_df = expected_first_df.set_index(time_col).rename_axis(None).asfreq("D") + + pd.testing.assert_frame_equal(transformed_df_dict["1-1"], expected_first_df) From 0c71e51435e7e1a7b2fa6eaf5049016908243bd6 Mon Sep 17 00:00:00 2001 From: Ying Chen Date: Tue, 12 Nov 2024 16:34:35 -0800 Subject: [PATCH 08/11] Fix the frequency for DeepAR PandasDataset (#153) * Fix the frequency for DeepAR PandasDataset * Bump version to 0.2.20.4.dev0 * Bump version to 0.2.20.4 --- runtime/databricks/automl_runtime/forecast/deepar/model.py | 2 +- runtime/databricks/automl_runtime/version.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/runtime/databricks/automl_runtime/forecast/deepar/model.py b/runtime/databricks/automl_runtime/forecast/deepar/model.py index 3af2e842..c92aa8b8 100644 --- a/runtime/databricks/automl_runtime/forecast/deepar/model.py +++ b/runtime/databricks/automl_runtime/forecast/deepar/model.py @@ -124,7 +124,7 @@ def predict_samples(self, self._frequency, self._id_cols) - test_ds = PandasDataset(model_input_transformed, target=self._target_col) + test_ds = PandasDataset(model_input_transformed, target=self._target_col, freq=self._frequency) forecast_iter = self._model.predict(test_ds, num_samples=num_samples) forecast_sample_list = list(forecast_iter) diff --git a/runtime/databricks/automl_runtime/version.py b/runtime/databricks/automl_runtime/version.py index 2d1bf413..2165ee0f 100644 --- a/runtime/databricks/automl_runtime/version.py +++ b/runtime/databricks/automl_runtime/version.py @@ -14,4 +14,4 @@ # limitations under the License. # -__version__ = "0.2.20.3" # pragma: no cover +__version__ = "0.2.20.4" # pragma: no cover From de71cd863e13790537dc1dd25c3e09df5bc09102 Mon Sep 17 00:00:00 2001 From: Ying Chen Date: Fri, 15 Nov 2024 13:23:05 -0800 Subject: [PATCH 09/11] Fix weekly date range bug for deepar (#154) * Fix weekly date range bug for deepar * Fix import --- .../automl_runtime/forecast/deepar/utils.py | 7 ++++ runtime/databricks/automl_runtime/version.py | 2 +- .../forecast/deepar/utils_test.py | 35 +++++++++++++++++++ 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/runtime/databricks/automl_runtime/forecast/deepar/utils.py b/runtime/databricks/automl_runtime/forecast/deepar/utils.py index b69c83e2..57b6690c 100644 --- a/runtime/databricks/automl_runtime/forecast/deepar/utils.py +++ b/runtime/databricks/automl_runtime/forecast/deepar/utils.py @@ -35,6 +35,13 @@ def set_index_and_fill_missing_time_steps(df: pd.DataFrame, time_col: str, multi-series - dictionary of transformed dataframes, each key is the (concatenated) id of the time series """ total_min, total_max = df[time_col].min(), df[time_col].max() + + # We need to adjust the frequency for pd.date_range if it is weekly, + # otherwise it would always be "W-SUN" + if frequency.upper() == "W": + weekday_name = total_min.strftime("%a").upper() # e.g., "FRI" + frequency = f"W-{weekday_name}" + new_index_full = pd.date_range(total_min, total_max, freq=frequency) if id_cols is not None: diff --git a/runtime/databricks/automl_runtime/version.py b/runtime/databricks/automl_runtime/version.py index 2165ee0f..67b3bb34 100644 --- a/runtime/databricks/automl_runtime/version.py +++ b/runtime/databricks/automl_runtime/version.py @@ -14,4 +14,4 @@ # limitations under the License. # -__version__ = "0.2.20.4" # pragma: no cover +__version__ = "0.2.20.5.dev0" # pragma: no cover diff --git a/runtime/tests/automl_runtime/forecast/deepar/utils_test.py b/runtime/tests/automl_runtime/forecast/deepar/utils_test.py index b3538781..c46eae15 100644 --- a/runtime/tests/automl_runtime/forecast/deepar/utils_test.py +++ b/runtime/tests/automl_runtime/forecast/deepar/utils_test.py @@ -108,3 +108,38 @@ def test_multi_series_multi_id_cols_filled(self): expected_first_df = expected_first_df.set_index(time_col).rename_axis(None).asfreq("D") pd.testing.assert_frame_equal(transformed_df_dict["1-1"], expected_first_df) + + def test_single_series_week_day_index(self): + target_col = "sales" + time_col = "date" + num_weeks = 10 + + # Starting from first Friday in 2020 + base_dates = pd.date_range( + start='2020-01-03', # First Friday of 2020 + periods=num_weeks, + freq='W-FRI' # Weekly frequency starting Friday + ) + + base_df = pd.DataFrame({ + time_col: base_dates, + target_col: range(num_weeks) + }) + + # Create a dataframe with missing weeks (drop weeks 3 and 4) + dropped_df = base_df.drop([3, 4]).reset_index(drop=True) + + # Transform the dataframe + transformed_df = set_index_and_fill_missing_time_steps( + dropped_df, + time_col, + "W" # Weekly frequency **without** specifying Friday + ) + + # Create expected dataframe + expected_df = base_df.copy() + expected_df.loc[[3, 4], target_col] = float('nan') + expected_df = expected_df.set_index(time_col).rename_axis(None).asfreq("W-FRI") + + # Assert equality + pd.testing.assert_frame_equal(transformed_df, expected_df) From a65242bf0f3936730c48c17d8f0ec9eff172b9c4 Mon Sep 17 00:00:00 2001 From: Ying Chen Date: Mon, 18 Nov 2024 09:57:56 -0800 Subject: [PATCH 10/11] Take average to aggregate duplicate time_col for DeepAR (#155) * Take average to aggregate duplicate time_col for DeepAR * nit * clean up unused import * nit in test * fix * bump 0.2.20.5.dev1 -> 0.2.20.5 --- .../automl_runtime/forecast/deepar/model.py | 7 +++ runtime/databricks/automl_runtime/version.py | 2 +- .../forecast/deepar/model_test.py | 63 ++++++++++++++++--- 3 files changed, 63 insertions(+), 9 deletions(-) diff --git a/runtime/databricks/automl_runtime/forecast/deepar/model.py b/runtime/databricks/automl_runtime/forecast/deepar/model.py index c92aa8b8..4ce75c9f 100644 --- a/runtime/databricks/automl_runtime/forecast/deepar/model.py +++ b/runtime/databricks/automl_runtime/forecast/deepar/model.py @@ -119,6 +119,13 @@ def predict_samples(self, if num_samples is None: num_samples = self._num_samples + # Group by the time column in case there are multiple rows for each time column, + # for example, the user didn't provide all the identity columns for a multi-series dataset + group_cols = [self._time_col] + if self._id_cols: + group_cols += self._id_cols + model_input = model_input.groupby(group_cols).agg({self._target_col: "mean"}).reset_index() + model_input_transformed = set_index_and_fill_missing_time_steps(model_input, self._time_col, self._frequency, diff --git a/runtime/databricks/automl_runtime/version.py b/runtime/databricks/automl_runtime/version.py index 67b3bb34..d1e4e23a 100644 --- a/runtime/databricks/automl_runtime/version.py +++ b/runtime/databricks/automl_runtime/version.py @@ -14,4 +14,4 @@ # limitations under the License. # -__version__ = "0.2.20.5.dev0" # pragma: no cover +__version__ = "0.2.20.5" # pragma: no cover diff --git a/runtime/tests/automl_runtime/forecast/deepar/model_test.py b/runtime/tests/automl_runtime/forecast/deepar/model_test.py index 1d3ed3e3..d534d9b8 100644 --- a/runtime/tests/automl_runtime/forecast/deepar/model_test.py +++ b/runtime/tests/automl_runtime/forecast/deepar/model_test.py @@ -77,6 +77,15 @@ def forward(self, past_target): device="cpu", ) + def _check_requirements(self, run_id: str): + # read requirements.txt from the run + requirements_path = mlflow.artifacts.download_artifacts(f"runs:/{run_id}/model/requirements.txt") + with open(requirements_path, "r") as f: + requirements = f.read() + # check if all additional dependencies are logged + for dependency in DEEPAR_ADDITIONAL_PIP_DEPS: + self.assertIn(dependency, requirements, f"requirements.txt should contain {dependency} but got {requirements}") + def test_model_save_and_load_single_series(self): target_col = "sales" time_col = "date" @@ -210,11 +219,49 @@ def test_model_save_and_load_multi_series_multi_id_cols(self): self.assertEqual(len(pred_df), self.prediction_length * 4) self.assertGreater(pred_df[time_col].min(), sample_input[time_col].max()) - def _check_requirements(self, run_id: str): - # read requirements.txt from the run - requirements_path = mlflow.artifacts.download_artifacts(f"runs:/{run_id}/model/requirements.txt") - with open(requirements_path, "r") as f: - requirements = f.read() - # check if all additional dependencies are logged - for dependency in DEEPAR_ADDITIONAL_PIP_DEPS: - self.assertIn(dependency, requirements, f"requirements.txt should contain {dependency} but got {requirements}") + def test_model_prediction_with_duplicate_timestamps(self): + """ + Test that the model correctly handles and averages multiple rows with the same timestamp + when identity columns are not provided. + """ + target_col = "sales" + time_col = "date" + + deepar_model = DeepARModel( + model=self.model, + horizon=self.prediction_length, + frequency="d", + num_samples=1, + target_col=target_col, + time_col=time_col, + ) + + # Create sample input with duplicate timestamps + dates = pd.to_datetime([ + "2020-10-01", "2020-10-01", # duplicate date with different values + "2020-10-04", "2020-10-04", "2020-10-04", # triple duplicate + "2020-10-07" # single entry + ]) + + sales = [10, 20, # should average to 15 + 30, 60, 90, # should average to 60 + 100] # single value stays 100 + + sample_input = pd.DataFrame({ + time_col: dates, + target_col: sales + }) + + with mlflow.start_run() as run: + mlflow_deepar_log_model(deepar_model, sample_input) + + run_id = run.info.run_id + + # Load the model and predict + loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") + pred_df = loaded_model.predict(sample_input) + + # Verify the prediction output format + self.assertEqual(pred_df.columns.tolist(), [time_col, "yhat"]) + self.assertEqual(len(pred_df), self.prediction_length) + self.assertGreater(pred_df[time_col].min(), sample_input[time_col].max()) From c799c100b3751d2b274449f422cd89ee9449f740 Mon Sep 17 00:00:00 2001 From: Lan Zhang Date: Fri, 24 Jan 2025 17:36:21 -0800 Subject: [PATCH 11/11] fix --- .../automl_runtime/forecast/deepar/model.py | 2 +- .../forecast/deepar/model_test.py | 41 +++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/runtime/databricks/automl_runtime/forecast/deepar/model.py b/runtime/databricks/automl_runtime/forecast/deepar/model.py index 4ce75c9f..92636406 100644 --- a/runtime/databricks/automl_runtime/forecast/deepar/model.py +++ b/runtime/databricks/automl_runtime/forecast/deepar/model.py @@ -131,7 +131,7 @@ def predict_samples(self, self._frequency, self._id_cols) - test_ds = PandasDataset(model_input_transformed, target=self._target_col, freq=self._frequency) + test_ds = PandasDataset(model_input_transformed, target=self._target_col) forecast_iter = self._model.predict(test_ds, num_samples=num_samples) forecast_sample_list = list(forecast_iter) diff --git a/runtime/tests/automl_runtime/forecast/deepar/model_test.py b/runtime/tests/automl_runtime/forecast/deepar/model_test.py index d534d9b8..44894b93 100644 --- a/runtime/tests/automl_runtime/forecast/deepar/model_test.py +++ b/runtime/tests/automl_runtime/forecast/deepar/model_test.py @@ -265,3 +265,44 @@ def test_model_prediction_with_duplicate_timestamps(self): self.assertEqual(pred_df.columns.tolist(), [time_col, "yhat"]) self.assertEqual(len(pred_df), self.prediction_length) self.assertGreater(pred_df[time_col].min(), sample_input[time_col].max()) + + def test_model_prediction_with_monthly_data(self): + target_col = "sales" + time_col = "date" + + deepar_model = DeepARModel( + model=self.model, + horizon=self.prediction_length, + frequency="MS", + num_samples=1, + target_col=target_col, + time_col=time_col, + ) + + # Create sample input with duplicate timestamps + dates = pd.to_datetime([ + "2020-10-01", "2020-11-01", "2020-12-01", + "2021-01-01", "2021-02-01", "2021-03-01" + ]) + + sales = [10, 20, 30, + 60, 90, 100] + + sample_input = pd.DataFrame({ + time_col: dates, + target_col: sales + }) + + with mlflow.start_run() as run: + mlflow_deepar_log_model(deepar_model, sample_input) + + run_id = run.info.run_id + + # Load the model and predict + loaded_model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model") + pred_df = loaded_model.predict(sample_input) + + # Verify the prediction output format + self.assertEqual(pred_df.columns.tolist(), [time_col, "yhat"]) + self.assertEqual(len(pred_df), self.prediction_length) + self.assertGreater(pred_df[time_col].min(), sample_input[time_col].max()) \ No newline at end of file