Skip to content

[ML-47076] Support custom frequencies #162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions runtime/databricks/automl_runtime/forecast/deepar/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,16 @@ class DeepARModel(ForecastModel):
DeepAR mlflow model wrapper for forecasting.
"""

def __init__(self, model: PyTorchPredictor, horizon: int, frequency: str,
def __init__(self, model: PyTorchPredictor, horizon: int, frequency_unit: str, frequency_quantity: int,
num_samples: int,
target_col: str, time_col: str,
id_cols: Optional[List[str]] = None) -> None:
"""
Initialize the DeepAR mlflow Python model wrapper
:param model: DeepAR model
:param horizon: the number of periods to forecast forward
:param frequency: the frequency of the time series
:param frequency_unit: the frequency unit of the time series
:param frequency_quantity: the frequency quantity of the time series
:param num_samples: the number of samples to draw from the distribution
:param target_col: the target column name
:param time_col: the time column name
Expand All @@ -60,7 +61,8 @@ def __init__(self, model: PyTorchPredictor, horizon: int, frequency: str,
super().__init__()
self._model = model
self._horizon = horizon
self._frequency = frequency
self._frequency_unit = frequency_unit
self._frequency_quantity = frequency_quantity
self._num_samples = num_samples
self._target_col = target_col
self._time_col = time_col
Expand Down Expand Up @@ -128,7 +130,8 @@ def predict_samples(self,

model_input_transformed = set_index_and_fill_missing_time_steps(model_input,
self._time_col,
self._frequency,
self._frequency_unit,
self._frequency_quantity,
self._id_cols)

test_ds = PandasDataset(model_input_transformed, target=self._target_col)
Expand Down
32 changes: 19 additions & 13 deletions runtime/databricks/automl_runtime/forecast/deepar/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@
import pandas as pd


def validate_and_generate_index(df: pd.DataFrame, time_col: str, frequency: str):
def validate_and_generate_index(df: pd.DataFrame,
time_col: str,
frequency_unit: str,
frequency_quantity: int):
"""
Generate a complete time index for the given DataFrame based on the specified frequency.
- Ensures the time column is in datetime format.
- Validates consistency in the day of the month if frequency is "MS" (month start).
- Generates a new time index from the minimum to the maximum timestamp in the data.
:param df: The input DataFrame containing the time column.
:param time_col: The name of the time column.
:param frequency: The frequency of the time series.
:param frequency_unit: The frequency unit of the time series.
:param frequency_quantity: The frequency quantity of the time series.
:return: A complete time index covering the full range of the dataset.
:raises ValueError: If the day-of-month pattern is inconsistent for "MS" frequency.
"""
if frequency.upper() != "MS":
return pd.date_range(df[time_col].min(), df[time_col].max(), freq=frequency)
if frequency_unit.upper() != "MS":
return pd.date_range(df[time_col].min(), df[time_col].max(), freq=f"{frequency_quantity}{frequency_unit}")

df[time_col] = pd.to_datetime(df[time_col]) # Ensure datetime format

Expand Down Expand Up @@ -63,7 +67,8 @@ def validate_and_generate_index(df: pd.DataFrame, time_col: str, frequency: str)
return new_index_full

def set_index_and_fill_missing_time_steps(df: pd.DataFrame, time_col: str,
frequency: str,
frequency_unit: str,
frequency_quantity: int,
id_cols: Optional[List[str]] = None):
"""
Transform the input dataframe to an acceptable format for the GluonTS library.
Expand All @@ -73,20 +78,21 @@ def set_index_and_fill_missing_time_steps(df: pd.DataFrame, time_col: str,

:param df: the input dataframe that contains time_col
:param time_col: time column name
:param frequency: the frequency of the time series
:param frequency_unit: the frequency unit of the time series
:param frequency_quantity: the frequency quantity of the time series
:param id_cols: the column names of the identity columns for multi-series time series; None for single series
:return: single-series - transformed dataframe;
multi-series - dictionary of transformed dataframes, each key is the (concatenated) id of the time series
"""
total_min, total_max = df[time_col].min(), df[time_col].max()

# We need to adjust the frequency for pd.date_range if it is weekly,
# We need to adjust the frequency_unit for pd.date_range if it is weekly,
# otherwise it would always be "W-SUN"
if frequency.upper() == "W":
if frequency_unit.upper() == "W":
weekday_name = total_min.strftime("%a").upper() # e.g., "FRI"
frequency = f"W-{weekday_name}"
frequency_unit = f"W-{weekday_name}"

new_index_full = validate_and_generate_index(df=df, time_col=time_col, frequency=frequency)
valid_index = validate_and_generate_index(df=df, time_col=time_col, frequency_unit=frequency_unit, frequency_quantity=frequency_quantity)

if id_cols is not None:
df_dict = {}
Expand All @@ -96,16 +102,16 @@ def set_index_and_fill_missing_time_steps(df: pd.DataFrame, time_col: str,
else:
ts_id = str(grouped_id)
df_dict[ts_id] = (grouped_df.set_index(time_col).sort_index()
.reindex(new_index_full).drop(id_cols, axis=1))
.reindex(valid_index).drop(id_cols, axis=1))

return df_dict

df = df.set_index(time_col).sort_index()

# Fill in missing time steps between the min and max time steps
df = df.reindex(new_index_full)
df = df.reindex(valid_index)

if frequency.upper() == "MS":
if frequency_unit.upper() == "MS":
# Truncate the day of month to avoid issues with pandas frequency check
df = df.to_period("M")

Expand Down
50 changes: 29 additions & 21 deletions runtime/databricks/automl_runtime/forecast/pmdarima/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,19 @@ def model_env(self):
return ARIMA_CONDA_ENV

@staticmethod
def _get_ds_indices(start_ds: pd.Timestamp, periods: int, frequency: str) -> pd.DatetimeIndex:
def _get_ds_indices(start_ds: pd.Timestamp, periods: int, frequency_unit: str, frequency_quantity: int) -> pd.DatetimeIndex:
"""
Create a DatetimeIndex with specified starting time and frequency, whose length is the given periods.
:param start_ds: the pd.Timestamp as the start of the DatetimeIndex.
:param periods: the length of the DatetimeIndex.
:param frequency: the frequency of the DatetimeIndex.
:param frequency_unit: the frequency unit of the DatetimeIndex.
:param frequency_quantity: the frequency quantity of the DatetimeIndex.
:return: a DatetimeIndex.
"""
ds_indices = pd.date_range(
start=start_ds,
periods=periods,
freq=pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[frequency])
freq=pd.DateOffset(**DATE_OFFSET_KEYWORD_MAP[frequency_unit]) * frequency_quantity
)
modified_start_ds = ds_indices.min()
if start_ds != modified_start_ds:
Expand All @@ -89,14 +90,15 @@ class ArimaModel(AbstractArimaModel):
ARIMA mlflow model wrapper for univariate forecasting.
"""

def __init__(self, pickled_model: bytes, horizon: int, frequency: str,
start_ds: pd.Timestamp, end_ds: pd.Timestamp,
def __init__(self, pickled_model: bytes, horizon: int, frequency_unit: str,
frequency_quantity: int, start_ds: pd.Timestamp, end_ds: pd.Timestamp,
time_col: str, exogenous_cols: Optional[List[str]] = None) -> None:
"""
Initialize the mlflow Python model wrapper for ARIMA.
:param pickled_model: the pickled ARIMA model as a bytes object.
:param horizon: int number of periods to forecast forward.
:param frequency: the frequency of the time series
:param frequency_unit: the frequency unit of the time series
:param frequency_quantity: the frequency quantity of the time series
:param start_ds: the start time of training data
:param end_ds: the end time of training data
:param time_col: the column name of the time column
Expand All @@ -106,7 +108,8 @@ def __init__(self, pickled_model: bytes, horizon: int, frequency: str,
super().__init__()
self._pickled_model = pickled_model
self._horizon = horizon
self._frequency = OFFSET_ALIAS_MAP[frequency]
self._frequency_unit = OFFSET_ALIAS_MAP[frequency_unit]
self._frequency_quantity = frequency_quantity
self._start_ds = pd.to_datetime(start_ds)
self._end_ds = pd.to_datetime(end_ds)
self._time_col = time_col
Expand Down Expand Up @@ -157,7 +160,8 @@ def make_future_dataframe(self, horizon: int = None, include_history: bool = Tru
start_time=self._start_ds,
end_time=self._end_ds,
horizon=horizon or self._horizon,
frequency=self._frequency,
frequency_unit=self._frequency_unit,
frequency_quantity=self._frequency_quantity,
include_history=include_history
)

Expand Down Expand Up @@ -192,7 +196,7 @@ def _predict_impl(self, input_df: pd.DataFrame) -> pd.DataFrame:
)
# Check if the time has correct frequency
consistency = df["ds"].apply(lambda x:
is_frequency_consistency(self._start_ds, x, self._frequency)
is_frequency_consistency(self._start_ds, x, self._frequency_unit, self._frequency_quantity)
).all()
if not consistency:
raise MlflowException(
Expand All @@ -203,7 +207,7 @@ def _predict_impl(self, input_df: pd.DataFrame) -> pd.DataFrame:
)
preds_pds = []
# Out-of-sample prediction if needed
horizon = calculate_period_differences(self._end_ds, max(df["ds"]), self._frequency)
horizon = calculate_period_differences(self._end_ds, max(df["ds"]), self._frequency_unit, self._frequency_quantity)
if horizon > 0:
X_future = df[df["ds"] > self._end_ds].set_index("ds")
future_pd = self._forecast(
Expand All @@ -229,8 +233,8 @@ def _predict_in_sample(
end_ds: pd.Timestamp = None,
X: pd.DataFrame = None) -> pd.DataFrame:
if start_ds and end_ds:
start_idx = calculate_period_differences(self._start_ds, start_ds, self._frequency)
end_idx = calculate_period_differences(self._start_ds, end_ds, self._frequency)
start_idx = calculate_period_differences(self._start_ds, start_ds, self._frequency_unit, self._frequency_quantity)
end_idx = calculate_period_differences(self._start_ds, end_ds, self._frequency_unit, self._frequency_quantity)
else:
start_ds = self._start_ds
end_ds = self._end_ds
Expand All @@ -242,8 +246,8 @@ def _predict_in_sample(
start=start_idx,
end=end_idx,
return_conf_int=True)
periods = calculate_period_differences(self._start_ds, end_ds, self._frequency) + 1
ds_indices = self._get_ds_indices(start_ds=self._start_ds, periods=periods, frequency=self._frequency)[start_idx:]
periods = calculate_period_differences(self._start_ds, end_ds, self._frequency_unit, self._frequency_quantity) + 1
ds_indices = self._get_ds_indices(start_ds=self._start_ds, periods=periods, frequency_unit=self._frequency_unit, frequency_quantity=self._frequency_quantity)[start_idx:]
in_sample_pd = pd.DataFrame({'ds': ds_indices, 'yhat': preds_in_sample})
in_sample_pd[["yhat_lower", "yhat_upper"]] = conf_in_sample
return in_sample_pd
Expand All @@ -257,7 +261,7 @@ def _forecast(
horizon,
X=X,
return_conf_int=True)
ds_indices = self._get_ds_indices(start_ds=self._end_ds, periods=horizon + 1, frequency=self._frequency)[1:]
ds_indices = self._get_ds_indices(start_ds=self._end_ds, periods=horizon + 1, frequency_unit=self._frequency_unit, frequency_quantity=self._frequency_quantity)[1:]
preds_pd = pd.DataFrame({'ds': ds_indices, 'yhat': preds})
preds_pd[["yhat_lower", "yhat_upper"]] = conf
return preds_pd
Expand All @@ -268,14 +272,15 @@ class MultiSeriesArimaModel(AbstractArimaModel):
ARIMA mlflow model wrapper for multivariate forecasting.
"""

def __init__(self, pickled_model_dict: Dict[Tuple, bytes], horizon: int, frequency: str,
def __init__(self, pickled_model_dict: Dict[Tuple, bytes], horizon: int, frequency_unit: str, frequency_quantity: int,
start_ds_dict: Dict[Tuple, pd.Timestamp], end_ds_dict: Dict[Tuple, pd.Timestamp],
time_col: str, id_cols: List[str], exogenous_cols: Optional[List[str]] = None) -> None:
"""
Initialize the mlflow Python model wrapper for multiseries ARIMA.
:param pickled_model_dict: the dictionary of binarized ARIMA models for different time series.
:param horizon: int number of periods to forecast forward.
:param frequency: the frequency of the time series
:param frequency_unit: the frequency unit of the time series
:param frequency_quantity: the frequency quantity of the time series
:param start_ds_dict: the dictionary of the starting time of each time series in training data.
:param end_ds_dict: the dictionary of the end time of each time series in training data.
:param time_col: the column name of the time column
Expand All @@ -286,7 +291,8 @@ def __init__(self, pickled_model_dict: Dict[Tuple, bytes], horizon: int, frequen
super().__init__()
self._pickled_models = pickled_model_dict
self._horizon = horizon
self._frequency = frequency
self._frequency_unit = frequency_unit
self._frequency_quantity = frequency_quantity
self._starts = start_ds_dict
self._ends = end_ds_dict
self._time_col = time_col
Expand Down Expand Up @@ -329,7 +335,8 @@ def make_future_dataframe(
start_time=self._starts,
end_time=self._ends,
horizon=horizon,
frequency=self._frequency,
frequency_unit=self._frequency_unit,
frequency_quantity=self._frequency_quantity,
include_history=include_history,
groups=groups,
identity_column_names=self._id_cols
Expand Down Expand Up @@ -360,7 +367,7 @@ def _predict_timeseries_single_id(
horizon: int,
include_history: bool = True,
df: Optional[pd.DataFrame] = None) -> pd.DataFrame:
arima_model_single_id = ArimaModel(self._pickled_models[id_], self._horizon, self._frequency,
arima_model_single_id = ArimaModel(self._pickled_models[id_], self._horizon, self._frequency_unit, self._frequency_quantity,
self._starts[id_], self._ends[id_], self._time_col, self._exogenous_cols)
preds_df = arima_model_single_id.predict_timeseries(horizon, include_history, df)
for id, col_name in zip(id_, self._id_cols):
Expand Down Expand Up @@ -401,7 +408,8 @@ def _predict_single_id(self, df: pd.DataFrame) -> pd.DataFrame:
id_ = df["ts_id"].to_list()[0]
arima_model_single_id = ArimaModel(self._pickled_models[id_],
self._horizon,
self._frequency,
self._frequency_unit,
self._frequency_quantity,
self._starts[id_],
self._ends[id_],
self._time_col,
Expand Down
Loading