Skip to content

[ML-42739] Add custom forecasting data splits for automl_runtime #145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 21 commits into from
Jul 25, 2024
Merged
24 changes: 17 additions & 7 deletions runtime/databricks/automl_runtime/forecast/pmdarima/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class ArimaEstimator:
"""

def __init__(self, horizon: int, frequency_unit: str, metric: str, seasonal_periods: List[int],
num_folds: int = 20, max_steps: int = 150, exogenous_cols: Optional[List[str]] = None) -> None:
num_folds: int = 20, max_steps: int = 150, exogenous_cols: Optional[List[str]] = None,
split_cutoff: Optional[pd.Timestamp] = None) -> None:
"""
:param horizon: Number of periods to forecast forward
:param frequency_unit: Frequency of the time series
Expand All @@ -53,6 +54,7 @@ def __init__(self, horizon: int, frequency_unit: str, metric: str, seasonal_peri
self._num_folds = num_folds
self._max_steps = max_steps
self._exogenous_cols = exogenous_cols
self._split_cutoff = split_cutoff

def fit(self, df: pd.DataFrame) -> pd.DataFrame:
"""
Expand Down Expand Up @@ -88,12 +90,20 @@ def fit(self, df: pd.DataFrame) -> pd.DataFrame:
# so the minimum valid seasonality period is always 1

validation_horizon = utils.get_validation_horizon(history_pd, self._horizon, self._frequency_unit)
cutoffs = utils.generate_cutoffs(
history_pd,
horizon=validation_horizon,
unit=self._frequency_unit,
num_folds=self._num_folds,
)
if self._split_cutoff:
cutoffs = utils.generate_custom_cutoffs(
history_pd,
horizon=validation_horizon,
unit=self._frequency_unit,
split_cutoff=self._split_cutoff
)
else:
cutoffs = utils.generate_cutoffs(
history_pd,
horizon=validation_horizon,
unit=self._frequency_unit,
num_folds=self._num_folds,
)

result = self._fit_predict(history_pd, cutoffs=cutoffs, seasonal_period=m, max_steps=self._max_steps)
metric = result["metrics"]["smape"]
Expand Down
24 changes: 17 additions & 7 deletions runtime/databricks/automl_runtime/forecast/prophet/forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ def __init__(self, horizon: int, frequency_unit: str, metric: str, interval_widt
algo=hyperopt.tpe.suggest, num_folds: int = 5,
max_eval: int = 10, trial_timeout: int = None,
random_state: int = 0, is_parallel: bool = True,
regressors = None, **prophet_kwargs) -> None:
regressors = None,
split_cutoff: Optional[pd.Timestamp] = None, **prophet_kwargs) -> None:
"""
Initialization

Expand Down Expand Up @@ -125,6 +126,7 @@ def __init__(self, horizon: int, frequency_unit: str, metric: str, interval_widt
self._timeout = trial_timeout
self._is_parallel = is_parallel
self._regressors = regressors
self._split_cutoff = split_cutoff
self._prophet_kwargs = prophet_kwargs

def fit(self, df: pd.DataFrame) -> pd.DataFrame:
Expand All @@ -139,12 +141,20 @@ def fit(self, df: pd.DataFrame) -> pd.DataFrame:
seasonality_mode = ["additive", "multiplicative"]

validation_horizon = utils.get_validation_horizon(df, self._horizon, self._frequency_unit)
cutoffs = utils.generate_cutoffs(
df.reset_index(drop=True),
horizon=validation_horizon,
unit=self._frequency_unit,
num_folds=self._num_folds,
)
if self._split_cutoff:
cutoffs = utils.generate_custom_cutoffs(
df.reset_index(drop=True),
horizon=validation_horizon,
unit=self._frequency_unit,
split_cutoff=self._split_cutoff
)
else:
cutoffs = utils.generate_cutoffs(
df.reset_index(drop=True),
horizon=validation_horizon,
unit=self._frequency_unit,
num_folds=self._num_folds,
)

train_fn = partial(_prophet_fit_predict, history_pd=df, horizon=validation_horizon,
frequency=self._frequency_unit, cutoffs=cutoffs,
Expand Down
27 changes: 27 additions & 0 deletions runtime/databricks/automl_runtime/forecast/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,33 @@ def generate_cutoffs(df: pd.DataFrame, horizon: int, unit: str,
)
return list(reversed(result))

def generate_custom_cutoffs(df: pd.DataFrame, horizon: int, unit: str,
split_cutoff: pd.Timestamp) -> List[pd.Timestamp]:
"""
Generate custom cutoff times for cross validation based on user-specified split cutoff.
Period (step size) is 1.
:param df: pd.DataFrame of the historical data.
:param horizon: int number of time into the future for forecasting.
:param unit: frequency unit of the time series, which must be a pandas offset alias.
:param split_cutoff: the user-specified cutoff, as the starting point of cutoffs
:return: list of pd.Timestamp cutoffs for cross-validation.
"""
period = 1
period_dateoffset = pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[unit])*period
horizon_dateoffset = pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[unit])*horizon

cutoff = split_cutoff
result = [cutoff]
while result[-1] <= max(df["ds"]) - horizon_dateoffset:
cutoff += period_dateoffset
if not (((df["ds"] > cutoff) & (df["ds"] <= cutoff + horizon_dateoffset)).any()):
if cutoff < df["ds"].max():
closest_date = df[df["ds"] > cutoff].min()["ds"]
cutoff = closest_date - horizon_dateoffset
result.append(cutoff)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comments like those in L173-182? Would be very helpful for review and when we look back in the future. E.g. is the cutoff in result guaranteed to be an existing data point in df or not?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also i'm not completely sure whether simply reverting -= to += is just correct. I'll take a deeper look later

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments added.

result = result[:-1]
return result

def is_quaterly_alias(freq: str):
return freq in QUATERLY_OFFSET_ALIAS

Expand Down
11 changes: 11 additions & 0 deletions runtime/tests/automl_runtime/forecast/pmdarima/training_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,17 @@ def test_fit_success_with_exogenous(self):
results_pd = arima_estimator.fit(self.df_with_exogenous)
self.assertIn("smape", results_pd)
self.assertIn("pickled_model", results_pd)

def test_fit_success_with_split_cutoff(self):
arima_estimator = ArimaEstimator(horizon=1,
frequency_unit="d",
metric="smape",
seasonal_periods=[1, 7],
num_folds=2,
split_cutoff=pd.Timestamp('2020-07-17 00:00:00'))
results_pd = arima_estimator.fit(self.df)
self.assertIn("smape", results_pd)
self.assertIn("pickled_model", results_pd)

def test_fit_skip_too_long_seasonality(self):
arima_estimator = ArimaEstimator(horizon=1,
Expand Down
27 changes: 27 additions & 0 deletions runtime/tests/automl_runtime/forecast/prophet/forecast_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,33 @@ def test_training_with_extra_regressors(self):
model_json = json.loads(results["model_json"][0])
self.assertListEqual(model_json["extra_regressors"][0], ["f1", "f2"])

def test_training_with_split_cutoff(self):
hyperopt_estim = ProphetHyperoptEstimator(horizon=1,
frequency_unit="d",
metric="smape",
interval_width=0.8,
country_holidays="US",
search_space=self.search_space,
num_folds=2,
trial_timeout=1000,
random_state=0,
is_parallel=False,
split_cutoff=pd.Timestamp('2020-07-10 00:00:00'))

for df in [self.df, self.df_datetime_date, self.df_string_time]:
results = hyperopt_estim.fit(df)
self.assertAlmostEqual(results["mse"][0], 0)
self.assertAlmostEqual(results["rmse"][0], 0, delta=1e-6)
self.assertAlmostEqual(results["mae"][0], 0, delta=1e-6)
self.assertAlmostEqual(results["mape"][0], 0)
self.assertAlmostEqual(results["mdape"][0], 0)
self.assertAlmostEqual(results["smape"][0], 0)
self.assertAlmostEqual(results["coverage"][0], 1)
# check the best result parameter is inside the search space
model_json = json.loads(results["model_json"][0])
self.assertGreaterEqual(model_json["changepoint_prior_scale"], 0.1)
self.assertLessEqual(model_json["changepoint_prior_scale"], 0.5)

@patch("databricks.automl_runtime.forecast.prophet.forecast.fmin")
@patch("databricks.automl_runtime.forecast.prophet.forecast.Trials")
@patch("databricks.automl_runtime.forecast.prophet.forecast.partial")
Expand Down
69 changes: 68 additions & 1 deletion runtime/tests/automl_runtime/forecast/utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
from databricks.automl_runtime.forecast import DATE_OFFSET_KEYWORD_MAP
from databricks.automl_runtime.forecast.utils import \
generate_cutoffs, get_validation_horizon, calculate_period_differences, \
is_frequency_consistency, make_future_dataframe, make_single_future_dataframe
is_frequency_consistency, make_future_dataframe, make_single_future_dataframe, \
generate_custom_cutoffs


class TestGetValidationHorizon(unittest.TestCase):
Expand Down Expand Up @@ -177,6 +178,72 @@ def test_generate_cutoffs_success_annualy(self):
self.assertEqual([pd.Timestamp('2018-07-14 00:00:00'), pd.Timestamp('2019-07-14 00:00:00'), pd.Timestamp('2020-07-14 00:00:00')], cutoffs)


class TestTestGenerateCustomCutoffs(unittest.TestCase):

def test_generate_custom_cutoffs_success_hourly(self):
df = pd.DataFrame(
pd.date_range(start="2020-07-01", periods=168, freq='h'), columns=["ds"]
).rename_axis("y").reset_index()
expected_cutoffs = [pd.Timestamp('2020-07-07 13:00:00'),
pd.Timestamp('2020-07-07 14:00:00'),
pd.Timestamp('2020-07-07 15:00:00'),
pd.Timestamp('2020-07-07 16:00:00')]
cutoffs = generate_custom_cutoffs(df, horizon=7, unit="H", split_cutoff=pd.Timestamp('2020-07-07 13:00:00'))
self.assertEqual(expected_cutoffs, cutoffs)

def test_generate_custom_cutoffs_success_daily(self):
df = pd.DataFrame(
pd.date_range(start="2020-07-01", end="2020-08-30", freq='d'), columns=["ds"]
).rename_axis("y").reset_index()
cutoffs = generate_custom_cutoffs(df, horizon=7, unit="D", split_cutoff=pd.Timestamp('2020-08-21 00:00:00'))
self.assertEqual([pd.Timestamp('2020-08-21 00:00:00'), pd.Timestamp('2020-08-22 00:00:00'), pd.Timestamp('2020-08-23 00:00:00')], cutoffs)

def test_generate_custom_cutoffs_success_weekly(self):
df = pd.DataFrame(
pd.date_range(start="2020-07-01", periods=52, freq='W'), columns=["ds"]
).rename_axis("y").reset_index()
cutoffs = generate_custom_cutoffs(df, horizon=7, unit="W", split_cutoff=pd.Timestamp('2021-04-25 00:00:00'))
self.assertEqual([pd.Timestamp('2021-04-25 00:00:00'), pd.Timestamp('2021-05-02 00:00:00'), pd.Timestamp('2021-05-09 00:00:00')], cutoffs)

def test_generate_custom_cutoffs_success_monthly(self):
df = pd.DataFrame(
pd.date_range(start="2020-01-12", periods=24, freq=pd.DateOffset(months=1)), columns=["ds"]
).rename_axis("y").reset_index()
cutoffs = generate_custom_cutoffs(df, horizon=7, unit="MS", split_cutoff=pd.Timestamp('2021-03-12 00:00:00'))
self.assertEqual([pd.Timestamp('2021-03-12 00:00:00'), pd.Timestamp('2021-04-12 00:00:00'), pd.Timestamp('2021-05-12 00:00:00')], cutoffs)

def test_generate_custom_cutoffs_success_quaterly(self):
df = pd.DataFrame(
pd.date_range(start="2020-07-12", periods=9, freq=pd.DateOffset(months=3)), columns=["ds"]
).rename_axis("y").reset_index()
cutoffs = generate_custom_cutoffs(df, horizon=7, unit="QS", split_cutoff=pd.Timestamp('2020-07-12 00:00:00'))
self.assertEqual([pd.Timestamp('2020-07-12 00:00:00'), pd.Timestamp('2020-10-12 00:00:00')], cutoffs)

def test_generate_custom_cutoffs_success_annualy(self):
df = pd.DataFrame(
pd.date_range(start="2012-07-14", periods=10, freq=pd.DateOffset(years=1)), columns=["ds"]
).rename_axis("y").reset_index()
cutoffs = generate_custom_cutoffs(df, horizon=7, unit="YS", split_cutoff=pd.Timestamp('2012-07-14 00:00:00'))
self.assertEqual([pd.Timestamp('2012-07-14 00:00:00'), pd.Timestamp('2013-07-14 00:00:00'), pd.Timestamp('2014-07-14 00:00:00')], cutoffs)

def test_generate_custom_cutoffs_success_with_small_gaps(self):
df = pd.DataFrame(
pd.date_range(start="2020-07-01", periods=30, freq='3d'), columns=["ds"]
).rename_axis("y").reset_index()
cutoffs = generate_custom_cutoffs(df, horizon=7, unit="D", split_cutoff=pd.Timestamp('2020-09-17 00:00:00'))
self.assertEqual([pd.Timestamp('2020-09-17 00:00:00'),
pd.Timestamp('2020-09-18 00:00:00'),
pd.Timestamp('2020-09-19 00:00:00')], cutoffs)

def test_generate_custom_cutoffs_success_with_large_gaps(self):
df = pd.DataFrame(
pd.date_range(start="2020-07-01", periods=30, freq='9d'), columns=["ds"]
).rename_axis("y").reset_index()
cutoffs = generate_custom_cutoffs(df, horizon=7, unit="D", split_cutoff=pd.Timestamp('2021-03-10 00:00:00'))
self.assertEqual([pd.Timestamp('2021-03-10 00:00:00'),
pd.Timestamp('2021-03-12 00:00:00')], cutoffs)


class TestCalculatePeriodsAndFrequency(unittest.TestCase):
def setUp(self) -> None:
return super().setUp()
Expand Down
Loading