diff --git a/cluster_experiments/cupac.py b/cluster_experiments/cupac.py index 773463d7..47e53fcb 100644 --- a/cluster_experiments/cupac.py +++ b/cluster_experiments/cupac.py @@ -117,16 +117,20 @@ def __init__( self, cupac_model: Optional[BaseEstimator] = None, target_col: str = "target", + scale_col: Optional[str] = None, features_cupac_model: Optional[List[str]] = None, cache_fit: bool = True, ): self.cupac_model: BaseEstimator = cupac_model or EmptyRegressor() self.target_col = target_col + self.scale_col = scale_col self.cupac_outcome_name = f"estimate_{target_col}" self.features_cupac_model: List[str] = features_cupac_model or [] self.is_cupac = not isinstance(self.cupac_model, EmptyRegressor) self.cache_fit = cache_fit + self.check_cupac_config() + def _prep_data_cupac( self, df: pd.DataFrame, pre_experiment_df: pd.DataFrame ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.Series]: @@ -136,7 +140,12 @@ def _prep_data_cupac( df_predict = df.drop(columns=[self.target_col]) # Split data into X and y pre_experiment_x = pre_experiment_df.drop(columns=[self.target_col]) - pre_experiment_y = pre_experiment_df[self.target_col] + if self.scale_col: + pre_experiment_y = ( + pre_experiment_df[self.target_col] / pre_experiment_df[self.scale_col] + ) + else: + pre_experiment_y = pre_experiment_df[self.target_col] # Keep only cupac features if self.features_cupac_model: @@ -214,3 +223,13 @@ def check_cupac_inputs(self, pre_experiment_df: Optional[pd.DataFrame] = None): "If cupac is not used, pre_experiment_df should not be provided - " "remove pre_experiment_df argument or set cupac_model to not None." ) + + def check_cupac_config(self): + if self.is_cupac and self.target_col in self.features_cupac_model: + raise ValueError( + "If cupac is used, target_col should not be in features_cupac_model." + ) + if self.is_cupac and self.scale_col in self.features_cupac_model: + raise ValueError( + "If cupac is used, scale_col should not be in features_cupac_model." + ) diff --git a/cluster_experiments/experiment_analysis.py b/cluster_experiments/experiment_analysis.py index 04d11cbd..ae58d913 100644 --- a/cluster_experiments/experiment_analysis.py +++ b/cluster_experiments/experiment_analysis.py @@ -1222,7 +1222,7 @@ def _split_pre_experiment_df(self, df: pd.DataFrame): class DeltaMethodAnalysis(ExperimentAnalysis): def __init__( self, - cluster_cols: Optional[List[str]] = None, + cluster_cols: List[str], target_col: str = "target", scale_col: str = "scale", treatment_col: str = "treatment", @@ -1235,13 +1235,12 @@ def __init__( The analysis is done on the aggregated data at the cluster level, making computation more efficient. Arguments: - cluster_cols: list of columns to use as clusters. Not available for the CUPED method. + cluster_cols: list of columns to use as clusters. target_col: name of the column containing the variable to measure (the numerator of the ratio). scale_col: name of the column containing the scale variable (the denominator of the ratio). treatment_col: name of the column containing the treatment variable. treatment: name of the treatment to use as the treated group. - covariates: list of columns to use as covariates. - ratio_covariates: list of tuples of columns to use as covariates for ratio metrics. First element is the numerator column, second element is the denominator column. + covariates: list of columns to use as covariates. Have to be previously aggregated at the cluster level. hypothesis: one of "two-sided", "less", "greater" indicating the alternative hypothesis. Usage: @@ -1260,7 +1259,8 @@ def __init__( DeltaMethodAnalysis( cluster_cols=['cluster'], target_col='x', - scale_col='y' + scale_col='y', + covariates=['x'] ).get_pvalue(df) ``` """ @@ -1275,19 +1275,83 @@ def __init__( ) self.scale_col = scale_col self.cluster_cols = cluster_cols or [] + self.covariates = covariates or [] + + def _transform_ratio_metric( + self, df: pd.DataFrame, numerator: str, denominator: str + ) -> pd.DataFrame: + df = df.copy() + mean_numerator, mean_denominator = df.loc[:, [numerator, denominator]].mean() + df[f"{numerator}_{denominator}"] = ( + df[numerator] / mean_denominator + - df[denominator] * mean_numerator / mean_denominator**2 + ) + return df + + def _transform_metrics(self, df: pd.DataFrame) -> pd.DataFrame: + """ + Transforms ratio metrics that are not at user level into a column with linearization from Delta Method done already. + These columns are added to the covariates which allows for the Delta Method to be used equally for all covariates (non user-level and user-level metrics). + """ + df = df.copy() + # transform target metric if necessary + numerator, denominator = self.target_col, self.scale_col + df = self._transform_ratio_metric(df, numerator, denominator) + df[f"original_{numerator}_{denominator}"] = df[numerator] / df[denominator] + self.target_metric = f"{numerator}_{denominator}" + + return df - if covariates is not None: - warnings.warn( - "Covariates are not supported in the Delta Method approximation for the time being. They will be ignored." + def _compute_thetas(self, df: pd.DataFrame) -> np.array: + """ + Computes the theta value for the CUPED method. + """ + + data = df[[self.target_metric] + self.covariates] + cov_mat = data.cov() # nxn + sigma = cov_mat.iloc[1:, 1:] # (n-1)x(n-1) + z = cov_mat.iloc[1:, 0] # (n-1)x1 + + thetas = np.dot(np.linalg.inv(sigma), z) + return thetas + + def _get_y_hat( + self, df: pd.DataFrame, theta: Optional[np.array] = None + ) -> pd.DataFrame: + """ + Compute CUPED estimate + """ + + Y = df["original_" + self.target_metric].values.astype(float) + covariates = df[self.covariates] + + Y_cv = Y.copy() + for k, _covariate in enumerate(self.covariates): + Y_cv -= theta[k] * ( + covariates.values[:, k] - covariates.values[:, k].mean() ) - if cluster_cols is None: - raise ValueError( - "cluster_cols must be provided for the Delta Method analysis" + return Y_cv + + def _get_var_y_hat( + self, df: pd.DataFrame, theta: Optional[np.array] = None + ) -> float: + """ + Compute variance from CUPED estimate. Should also work for non-CUPED estimates if no covariate is given. + """ + + group_size = len(df) + + Y_var = df[self.target_metric].var() + for k, covariate in enumerate(self.covariates): + Y_var += ( + theta[k] ** 2 * df[covariate].var() + - 2 * theta[k] * np.cov(df[covariate], df[self.target_metric])[0, 1] ) + return Y_var / group_size def _aggregate_to_cluster(self, df: pd.DataFrame) -> pd.DataFrame: """ - Returns an aggreegated dataframe of the target and scale variables at the cluster (and treatment) level. + Returns an aggregated dataframe of the target and scale variables at the cluster (and treatment) level. Arguments: df: dataframe containing the data to analyze @@ -1301,30 +1365,23 @@ def _aggregate_to_cluster(self, df: pd.DataFrame) -> pd.DataFrame: def _get_group_mean_and_variance(self, df: pd.DataFrame) -> tuple[float, float]: """ Returns the mean and variance of the ratio metric (target/scale) as estimated by the delta method for a given group (treatment). + If covariates are given, variance reduction is used. For it to work, the dataframe must be aggregated first at the cluster level, so no assumptions on aggregation of covariates has to be done. Arguments: df: dataframe containing the data to analyze. """ - df = self._aggregate_to_cluster(df) - group_size = len(df) - - if group_size < 1000: - self.__warn_small_group_size() - - target_mean, scale_mean = df.loc[:, [self.target_col, self.scale_col]].mean() - target_variance, scale_variance = df.loc[ - :, [self.target_col, self.scale_col] - ].var() - target_sum, scale_sum = df.loc[:, [self.target_col, self.scale_col]].sum() - - target_scale_cov = df.loc[:, self.target_col].cov(df.loc[:, self.scale_col]) + df = df.copy() + df = self._transform_metrics(df) + if self.covariates: + thetas = self._compute_thetas(df) + Y_hat = self._get_y_hat(df, thetas) + group_mean = Y_hat.mean() + group_variance = self._get_var_y_hat(df, thetas) + else: + Y_hat = self._get_y_hat(df) + group_mean = Y_hat.mean() + group_variance = self._get_var_y_hat(df) - group_mean = target_sum / scale_sum - group_variance = ( - (1 / (scale_mean**2)) * target_variance - + (target_mean**2) / (scale_mean**4) * scale_variance - - (2 * target_mean) / (scale_mean**3) * target_scale_cov - ) / group_size return group_mean, group_variance def _get_mean_standard_error(self, df: pd.DataFrame) -> tuple[float, float]: @@ -1333,6 +1390,14 @@ def _get_mean_standard_error(self, df: pd.DataFrame) -> tuple[float, float]: Variance reduction is used if covariates are given. """ + if (self._get_num_clusters(df) < 1000).any(): + self.__warn_small_group_size() + + if self.covariates: + self.__check_data_is_aggregated(df) + else: + df = self._aggregate_to_cluster(df) + is_treatment = df[self.treatment_col] == 1 treat_mean, treat_var = self._get_group_mean_and_variance(df[is_treatment]) ctrl_mean, ctrl_var = self._get_group_mean_and_variance(df[~is_treatment]) @@ -1457,6 +1522,24 @@ def from_config(cls, config): hypothesis=config.hypothesis, ) + def __check_data_is_aggregated(self, df): + """ + Check if the data is already aggregated at the cluster level. + """ + + if df.groupby(self.cluster_cols).size().max() > 1: + raise ValueError( + "The data should be aggregated at the cluster level for the Delta Method analysis using CUPED." + ) + + def _get_num_clusters(self, df): + """ + Check if there are enough clusters to run the analysis. + """ + return df.groupby(self.treatment_col).apply( + lambda x: self._get_cluster_column(x).nunique() + ) + def __warn_small_group_size(self): warnings.warn( "Delta Method approximation may not be accurate for small group sizes" diff --git a/cluster_experiments/power_analysis.py b/cluster_experiments/power_analysis.py index 609dfe29..9baaa622 100644 --- a/cluster_experiments/power_analysis.py +++ b/cluster_experiments/power_analysis.py @@ -103,6 +103,7 @@ def __init__( n_simulations: int = 100, alpha: float = 0.05, features_cupac_model: Optional[List[str]] = None, + scale_col: Optional[str] = None, seed: Optional[int] = None, hypothesis: str = "two-sided", ): @@ -120,6 +121,7 @@ def __init__( self.cupac_handler = CupacHandler( cupac_model=cupac_model, target_col=target_col, + scale_col=scale_col, features_cupac_model=features_cupac_model, ) if seed is not None: diff --git a/tests/analysis/test_analysis.py b/tests/analysis/test_analysis.py index e6784fad..53dca89f 100644 --- a/tests/analysis/test_analysis.py +++ b/tests/analysis/test_analysis.py @@ -311,6 +311,7 @@ def test_delta_analysis_aggregation(dates): def test_stats_delta_vs_ols(analysis_ratio_df, experiment_dates): + np.random.seed(2024) experiment_start_date = min(experiment_dates) analyser_ols = ClusteredOLSAnalysis(cluster_cols=["user"]) @@ -324,8 +325,124 @@ def test_stats_delta_vs_ols(analysis_ratio_df, experiment_dates): SE_delta = analyser_delta.get_standard_error(df) assert point_estimate_delta == pytest.approx( - point_estimate_ols, rel=1e-2 + point_estimate_ols, rel=1e-1 + ), "Point estimate is not consistent with Clustered OLS" + assert SE_delta == pytest.approx( + SE_ols, rel=1e-1 + ), "Standard error is not consistent with Clustered OLS" + + +def test_delta_cuped_analysis(analysis_ratio_df, experiment_dates): + experiment_start = min(experiment_dates) + + df_experiment = analysis_ratio_df.query(f"date >= '{experiment_start}'") + df_pre_experiment = analysis_ratio_df.query(f"date < '{experiment_start}'") + df_experiment = df_experiment.groupby(["user", "treatment"], as_index=False).agg( + {"target": "sum", "scale": "sum", "user_target_means": "mean"} + ) + df_pre_experiment = df_pre_experiment.groupby( + ["user", "treatment"], as_index=False + ).agg( + pre_target=("target", "sum"), + pre_scale=("scale", "sum"), + ) + df = df_experiment.merge(df_pre_experiment, on=["user", "treatment"]) + + analyser = DeltaMethodAnalysis( + cluster_cols=["user"], + scale_col="scale", + covariates=["user_target_means"], + ) + + assert 0.05 >= analyser.get_pvalue(df) >= 0 + + +def test_aa_delta_cuped_analysis(dates): + + analyser = DeltaMethodAnalysis( + cluster_cols=["user"], + scale_col="scale", + covariates=["pre_user_target_mean"], + ) + np.random.seed(2024) + p_values = [] + for _ in range(1000): + data = generate_ratio_metric_data( + dates, 40_000, num_users=5000, treatment_effect=0 + ) + pre_data = generate_ratio_metric_data( + dates, 40_000, num_users=5000, treatment_effect=0 + ) + + data = data.groupby(["user", "treatment"], as_index=False).agg( + {"target": "sum", "scale": "sum"} + ) + pre_data = pre_data.groupby(["user", "treatment"], as_index=False).agg( + pre_target=("target", "sum"), + pre_scale=("scale", "sum"), + pre_user_target_mean=("user_target_means", "mean"), + ) + data = data.merge(pre_data, on=["user", "treatment"]) + p_values.append(analyser.get_pvalue(data)) + + positive_rate = sum(p < 0.05 for p in p_values) / len(p_values) + + assert positive_rate == pytest.approx( + 0.05, abs=0.01 + ), "P-value A/A calculation is incorrect" + + +def test_stats_delta_cuped_vs_ols(analysis_ratio_df, experiment_dates): + np.random.seed(2024) + + experiment_start_date = min(experiment_dates) + + analyser_ols = ClusteredOLSAnalysis( + cluster_cols=["user"], covariates=["pre_user_target_mean"] + ) + analyser_delta = DeltaMethodAnalysis( + cluster_cols=["user"], + scale_col="scale", + covariates=["pre_user_target_mean"], + ) + + df_experiment = analysis_ratio_df.query(f"date >= '{experiment_start_date}'") + df_pre_experiment = analysis_ratio_df.query(f"date < '{experiment_start_date}'") + + df_pre_experiment = df_pre_experiment.groupby( + ["user", "treatment"], as_index=False + ).agg( + pre_target=("target", "sum"), + pre_scale=("scale", "sum"), + pre_user_target_mean=("user_target_means", "mean"), + ) + df = df_experiment.merge(df_pre_experiment, on=["user", "treatment"]) + # impute nans with mean values + df["pre_target"] = df["pre_target"].fillna(df["pre_target"].mean()) + df["pre_scale"] = df["pre_scale"].fillna(df["pre_scale"].mean()) + df["pre_user_target_mean"] = df["pre_user_target_mean"].fillna( + df["pre_user_target_mean"].mean() + ) + + df_delta = df.groupby(["user", "treatment"], as_index=False).agg( + { + "target": "sum", + "scale": "sum", + "pre_target": "mean", + "pre_scale": "mean", + "pre_user_target_mean": "mean", + } + ) + + point_estimate_ols = analyser_ols.get_point_estimate(df) + point_estimate_delta = analyser_delta.get_point_estimate(df_delta) + + SE_ols = analyser_ols.get_standard_error(df) + SE_delta = analyser_delta.get_standard_error(df_delta) + + assert point_estimate_delta == pytest.approx( + point_estimate_ols, rel=1e-1 ), "Point estimate is not consistent with Clustered OLS" assert SE_delta == pytest.approx( - SE_ols, rel=1e-2 + SE_ols, rel=1e-1 ), "Standard error is not consistent with Clustered OLS" diff --git a/tests/cupac/test_cupac_handler.py b/tests/cupac/test_cupac_handler.py index b2771c60..878426d9 100644 --- a/tests/cupac/test_cupac_handler.py +++ b/tests/cupac/test_cupac_handler.py @@ -96,3 +96,25 @@ def test_no_cupac(missing_cupac, df_feats): """Checks that if there is no cupac model, pre_experiment_df should not be provided""" with pytest.raises(ValueError, match="remove pre_experiment_df argument"): missing_cupac.add_covariates(df_feats, df_feats.head(10)) + + +def test_no_scale_covariates(): + """Checks that if we are using Delta Method, the scale should not be in features_cupac_model""" + with pytest.raises(ValueError, match="should not be in features_cupac_model"): + CupacHandler( + cupac_model=HistGradientBoostingRegressor(max_iter=3), + target_col="target", + scale_col="scale", + features_cupac_model=["x1", "x2", "scale"], + ) + + +def test_no_target_covariates(): + """Checks that if we are using Delta Method, the target should not be in features_cupac_model""" + with pytest.raises(ValueError, match="should not be in features_cupac_model"): + CupacHandler( + cupac_model=HistGradientBoostingRegressor(max_iter=3), + target_col="target", + scale_col="scale", + features_cupac_model=["x1", "x2", "target"], + )