diff --git a/.gitignore b/.gitignore index ab5c6116..0ed059fb 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,7 @@ lib/ build/ lightning_logs/ +.mypy_cache/ +__pycache__/ +*.pyc +*instructions.md diff --git a/molpipeline/experimental/uncertainty/__init__.py b/molpipeline/experimental/uncertainty/__init__.py new file mode 100644 index 00000000..27e28194 --- /dev/null +++ b/molpipeline/experimental/uncertainty/__init__.py @@ -0,0 +1,11 @@ +"""Wrappers for conformal prediction in MolPipeline. + +Provides ConformalPredictor and CrossConformalPredictor for robust uncertainty quantification. +""" + +from molpipeline.experimental.uncertainty.conformal import ( + ConformalPredictor, + CrossConformalPredictor, +) + +__all__ = ["ConformalPredictor", "CrossConformalPredictor"] diff --git a/molpipeline/experimental/uncertainty/conformal.py b/molpipeline/experimental/uncertainty/conformal.py new file mode 100644 index 00000000..ee7a0186 --- /dev/null +++ b/molpipeline/experimental/uncertainty/conformal.py @@ -0,0 +1,996 @@ +"""Conformal prediction wrappers for classification and regression using crepes.""" + +from collections.abc import Callable +from typing import Any, Literal + +import numpy as np +import numpy.typing as npt +from crepes import WrapClassifier, WrapRegressor +from crepes.extras import DifficultyEstimator, MondrianCategorizer +from scipy.stats import mode +from sklearn.base import BaseEstimator, clone +from sklearn.model_selection import KFold, StratifiedKFold +from sklearn.utils import check_random_state + +from molpipeline.experimental.uncertainty.utils import ( + _bin_targets, + _detect_estimator_type, +) + + +class ConformalPredictor(BaseEstimator): # pylint: disable=too-many-instance-attributes + """Conformal prediction wrapper for both classifiers and regressors. + + Uses crepes under the hood. + """ + + def __init__( + self, + estimator: BaseEstimator, + *, + mondrian: MondrianCategorizer | Callable[..., Any] | bool = False, + confidence_level: float = 0.9, + estimator_type: Literal["classifier", "regressor", "auto"] = "auto", + nonconformity: ( + Callable[ + [npt.NDArray[Any], npt.NDArray[Any] | None, npt.NDArray[Any] | None], + npt.NDArray[Any], + ] + | None + ) = None, + difficulty_estimator: DifficultyEstimator | None = None, + binning: int | MondrianCategorizer | None = None, + n_jobs: int = 1, + **kwargs: Any, + ) -> None: + """Initialize ConformalPredictor. + + Parameters + ---------- + estimator : BaseEstimator + The base estimator or pipeline to wrap. + mondrian : MondrianCategorizer | Callable[..., Any] | bool, optional + Mondrian calibration/grouping (default: False). + confidence_level : float, optional + Confidence level for prediction sets/intervals (default: 0.9). + estimator_type : Literal["classifier", "regressor", "auto"], optional + Type of estimator (default: "auto"). + nonconformity : Callable, optional + Nonconformity function for classification. + difficulty_estimator : DifficultyEstimator | None, optional + Difficulty estimator for normalized conformal prediction (regression). + binning : int | MondrianCategorizer | None, optional + Number of bins or MondrianCategorizer for Mondrian calibration (regression). + n_jobs : int, optional + Number of parallel jobs (default: 1). + **kwargs : Any + Additional keyword arguments for crepes. + + Raises + ------ + ValueError + For invalid parameters. + + """ + if not 0 < confidence_level < 1: + raise ValueError( + f"confidence_level must be in (0, 1), got {confidence_level}", + ) + + if estimator_type == "auto": + estimator_type = _detect_estimator_type(estimator) + elif estimator_type not in {"classifier", "regressor"}: + raise ValueError( + f"estimator_type must be 'classifier', 'regressor', " + f"or 'auto', got {estimator_type}", + ) + + if estimator_type == "regressor" and mondrian is True: + raise ValueError( + "mondrian=True is supported for classification.", + ) + + if binning is not None and estimator_type == "classifier": + raise ValueError( + "binning parameter is only supported for regression.", + ) + + if isinstance(binning, int) and binning <= 0: + raise ValueError(f"binning must be positive integer, got {binning}") + + self.estimator = estimator + self.mondrian = mondrian + self.confidence_level = confidence_level + self.estimator_type = estimator_type + self.nonconformity = nonconformity + self.difficulty_estimator = difficulty_estimator + self.binning = binning + self.n_jobs = n_jobs + self.kwargs = kwargs + self._conformal: WrapClassifier | WrapRegressor | None = None + self.fitted_ = False + self.calibrated_ = False + + def fit(self, x: npt.NDArray[Any], y: npt.NDArray[Any]) -> "ConformalPredictor": + """Fit the conformal predictor. + + Parameters + ---------- + x : npt.NDArray[Any] + Training features. + y : npt.NDArray[Any] + Training targets. + + Returns + ------- + ConformalPredictor + Self. + + Raises + ------ + ValueError + For invalid types and uninitialized. + RuntimeError + For initialization failures. + + """ + if self.estimator_type == "classifier": + self._conformal = WrapClassifier(clone(self.estimator)) + elif self.estimator_type == "regressor": + self._conformal = WrapRegressor(clone(self.estimator)) + else: + raise ValueError("estimator_type must be 'classifier' or 'regressor'") + + if self._conformal is None: # Type narrowing + raise RuntimeError("Failed to initialize conformal wrapper") + self._conformal.fit(x, y) + self.fitted_ = True + return self + + def calibrate( + self, + x_calib: npt.NDArray[Any], + y_calib: npt.NDArray[Any], + **calib_params: Any, + ) -> None: + """Calibrate the conformal predictor. + + Parameters + ---------- + x_calib : npt.NDArray[Any] + Calibration features. + y_calib : npt.NDArray[Any] + Calibration targets. + calib_params : dict + Additional calibration parameters. + + Raises + ------ + RuntimeError + If not fitted before calibrating. + ValueError + For validation errors. + + """ + if not self.fitted_ or self._conformal is None: + raise RuntimeError("Estimator must be fitted before calling calibrate") + + if self.estimator_type not in {"classifier", "regressor"}: + raise ValueError("estimator_type must be 'classifier' or 'regressor'") + kwargs: dict[str, Any] = calib_params.copy() + if self.estimator_type == "classifier": + if self.nonconformity is not None: + kwargs["nc"] = self.nonconformity + if self.mondrian is True: + kwargs["class_cond"] = True + elif isinstance(self.mondrian, MondrianCategorizer) or callable( + self.mondrian, + ): + kwargs["mc"] = self.mondrian + self._conformal.calibrate(x_calib, y_calib, **kwargs) + else: # regressor + if isinstance(self.mondrian, MondrianCategorizer) or callable( + self.mondrian, + ): + kwargs["mc"] = self.mondrian + if self.difficulty_estimator is not None: + kwargs["de"] = self.difficulty_estimator + if isinstance(self.binning, MondrianCategorizer): + kwargs["mc"] = self.binning + self._conformal.calibrate(x_calib, y_calib, **kwargs) + self.calibrated_ = True + + def predict(self, x: npt.NDArray[Any]) -> npt.NDArray[Any]: + """Predict using the conformal predictor. + + Parameters + ---------- + x : npt.NDArray[Any] + Features to predict. + + Returns + ------- + npt.NDArray[Any] + Predictions. + + Raises + ------ + ValueError + If not fitted. + + """ + if not self.fitted_ or self._conformal is None: + raise ValueError("Estimator must be fitted before calling predict") + return self._conformal.predict(x) + + def predict_proba(self, x: npt.NDArray[Any]) -> npt.NDArray[Any]: + """Predict probabilities using the conformal predictor. + + Parameters + ---------- + x : npt.NDArray[Any] + Features to predict. + + Returns + ------- + npt.NDArray[Any] + Predicted probabilities. + + Raises + ------ + ValueError + If not fitted. + RuntimeError + If wrapper type is incorrect. + NotImplementedError + If called for regressor. + + """ + if not self.fitted_ or self._conformal is None: + raise ValueError("Estimator must be fitted before calling predict_proba") + if self.estimator_type != "classifier": + raise NotImplementedError("predict_proba is for classifiers only.") + if isinstance(self._conformal, WrapClassifier): + return self._conformal.predict_proba(x) + raise RuntimeError("Expected WrapClassifier but got different type") + + def predict_conformal_set( + self, + x: npt.NDArray[Any], + confidence: float | None = None, + ) -> list[list[int]]: + """Predict conformal sets. + + Parameters + ---------- + x : npt.NDArray[Any] + Features to predict. + confidence : float, optional + Confidence level. Must be in (0, 1). + + Returns + ------- + list[list[int]] + Conformal prediction sets as list of lists containing class indices. + + Raises + ------ + ValueError + If not fitted or invalid confidence. + RuntimeError + If wrapper not initialized. + NotImplementedError + If called for regressor. + + """ + if not self.fitted_: + raise ValueError( + "Estimator must be fitted and calibrated before calling predict", + ) + if self._conformal is None: + raise RuntimeError("Conformal wrapper is not initialized") + if not self.calibrated_: + raise ValueError( + "Conformal predictor must be calibrated before making predictions", + ) + if self.estimator_type != "classifier": + raise NotImplementedError( + "predict_conformal_set is only for classification.", + ) + + conf = confidence if confidence is not None else self.confidence_level + if not 0 < conf < 1: + raise ValueError(f"confidence must be in (0, 1), got {conf}") + + if isinstance(self._conformal, WrapClassifier): + prediction_sets_binary = self._conformal.predict_set(x, confidence=conf) + + prediction_sets = [] + for i in range(prediction_sets_binary.shape[0]): + class_indices = np.where(prediction_sets_binary[i, :])[0].tolist() + prediction_sets.append(class_indices) + + return prediction_sets + raise RuntimeError("Expected WrapClassifier but got different type") + + def predict_p(self, x: npt.NDArray[Any], **kwargs: Any) -> npt.NDArray[Any]: + """Predict p-values. + + Parameters + ---------- + x : npt.NDArray[Any] + Features to predict. + kwargs : dict + Additional parameters. + + Returns + ------- + npt.NDArray[Any] + p-values. + + Raises + ------ + ValueError + If not fitted or not calibrated. + RuntimeError + If wrapper not initialized. + NotImplementedError + If called for regressor. + + """ + if not self.fitted_: + raise ValueError( + "Estimator must be fitted and calibrated before calling predict_p", + ) + if self._conformal is None: + raise RuntimeError("Conformal wrapper is not initialized") + if not self.calibrated_: + raise ValueError( + "Conformal predictor must be calibrated before making predictions", + ) + if self.estimator_type != "classifier": + raise NotImplementedError("predict_p is only for classification.") + if isinstance(self._conformal, WrapClassifier): + return self._conformal.predict_p(x, **kwargs) + raise RuntimeError("Expected WrapClassifier but got different type") + + def predict_int( + self, + x: npt.NDArray[Any], + confidence: float | None = None, + ) -> npt.NDArray[Any]: + """Predict intervals. + + Parameters + ---------- + x : npt.NDArray[Any] + Features to predict. + confidence : float, optional + Confidence level. Must be in (0, 1). + + Returns + ------- + npt.NDArray[Any] + Prediction intervals of shape (n_samples, 2) with columns [lower, upper]. + + Raises + ------ + ValueError + If not fitted or invalid confidence. + RuntimeError + If wrapper not initialized. + NotImplementedError + If called for classifier. + + """ + if self.estimator_type != "regressor": + raise NotImplementedError("predict_int is only for regression.") + + if not self.fitted_: + raise ValueError( + "Estimator must be fitted and calibrated before calling predict_int", + ) + if self._conformal is None: + raise RuntimeError("Conformal wrapper is not initialized") + if not self.calibrated_: + raise ValueError( + "Conformal predictor must be calibrated before making predictions", + ) + + conf = confidence if confidence is not None else self.confidence_level + if not 0 < conf < 1: + raise ValueError(f"confidence must be in (0, 1), got {conf}") + + if isinstance(self._conformal, WrapRegressor): + return self._conformal.predict_int(x, confidence=conf) + raise RuntimeError("Expected WrapRegressor but got different type") + + def get_params(self, deep: bool = True) -> dict[str, Any]: + """Get parameters for this estimator. + + Parameters + ---------- + deep : bool, optional + If True, will return the parameters for this estimator. + + Returns + ------- + dict[str, Any] + Parameter names mapped to their values. + + """ + params = { + "estimator": self.estimator, + "mondrian": self.mondrian, + "confidence_level": self.confidence_level, + "estimator_type": self.estimator_type, + "nonconformity": self.nonconformity, + "difficulty_estimator": self.difficulty_estimator, + "binning": self.binning, + "n_jobs": self.n_jobs, + } + params.update(self.kwargs) + + if deep and hasattr(self.estimator, "get_params"): + estimator_params = self.estimator.get_params(deep=True) + params.update({f"estimator__{k}": v for k, v in estimator_params.items()}) + + return params + + def set_params(self, **params: Any) -> "ConformalPredictor": + """Set the parameters of this estimator. + + Parameters + ---------- + **params : dict + Estimator parameters. + + Returns + ------- + ConformalPredictor + This estimator. + + Raises + ------ + ValueError + If invalid parameter provided. + + """ + valid_params = self.get_params(deep=False) + estimator_params: dict[str, Any] = {} + + for key, value in params.items(): + if key.startswith("estimator__"): + nested_key = key[len("estimator__") :] + estimator_params[nested_key] = value + elif key in valid_params: + setattr(self, key, value) + else: + raise ValueError( + f"Invalid parameter {key} for estimator {type(self).__name__}. ", + ) + + if estimator_params and hasattr(self.estimator, "set_params"): + self.estimator.set_params(**estimator_params) + + return self + + +class CrossConformalPredictor(ConformalPredictor): # pylint: disable=too-many-instance-attributes + """Cross-conformal prediction using WrapClassifier/WrapRegressor.""" + + def __init__( + self, + estimator: BaseEstimator, + *, + n_folds: int = 5, + confidence_level: float = 0.9, + mondrian: MondrianCategorizer | Callable[..., Any] | bool = False, + nonconformity: ( + Callable[ + [npt.NDArray[Any], npt.NDArray[Any] | None, npt.NDArray[Any] | None], + npt.NDArray[Any], + ] + | None + ) = None, + binning: int | MondrianCategorizer | None = None, + estimator_type: Literal["classifier", "regressor", "auto"] = "auto", + n_bins: int = 10, + random_state: int | None = None, + **kwargs: Any, + ) -> None: + """Initialize CrossConformalPredictor. + + Parameters + ---------- + estimator : BaseEstimator + The base estimator or pipeline to wrap. + n_folds : int, optional + Number of cross-validation folds (default: 5). + confidence_level : float, optional + Confidence level for prediction sets/intervals (default: 0.9). + mondrian : MondrianCategorizer | Callable[..., Any] | bool, optional + Mondrian calibration/grouping (default: False). + - True: Use class-conditional calibration for classification + nonconformity : Callable, optional + Nonconformity function for classification that takes (X_prob, classes, y) + and returns non-conformity scores. Examples: hinge, margin from + crepes.extras. + binning : int | MondrianCategorizer | None, optional + Number of bins or MondrianCategorizer for Mondrian calibration (regression). + estimator_type : Literal["classifier", "regressor", "auto"], optional + Type of estimator (default: 'auto'). + n_bins : int, optional + Number of bins for stratified splitting in regression (default: 10). + random_state : int | None, optional + Random state for reproducibility. + **kwargs : Any + Additional keyword arguments for crepes. + + """ + super().__init__( + estimator=estimator, + mondrian=mondrian, + confidence_level=confidence_level, + estimator_type=estimator_type, + nonconformity=nonconformity, + difficulty_estimator=None, # Not used in cross-conformal + binning=binning, + n_jobs=1, # Not used in cross-conformal + **kwargs, + ) + + self.n_folds = n_folds + self.n_bins = n_bins + self.random_state = random_state + self.models_: list[WrapClassifier | WrapRegressor] = [] + + def _create_splitter( + self, + y: npt.NDArray[Any], + rng: Any, + ) -> tuple[KFold | StratifiedKFold, npt.NDArray[Any]]: + """Create the appropriate splitter for cross-validation. + + Parameters + ---------- + y : npt.NDArray[Any] + Target values. + rng : Any + Random state object. + + Returns + ------- + tuple[KFold | StratifiedKFold, npt.NDArray[Any]] + Splitter and y values for splitting. + + Raises + ------ + ValueError + If estimator_type is not 'classifier' or 'regressor'. + + """ + if self.estimator_type == "classifier": + splitter = StratifiedKFold( + n_splits=self.n_folds, + shuffle=True, + random_state=rng, + ) + y_split = y + elif self.estimator_type == "regressor": + splitter = KFold( + n_splits=self.n_folds, + shuffle=True, + random_state=rng, + ) + y_split = _bin_targets(y, n_bins=self.n_bins) + else: + raise ValueError("estimator_type must be 'classifier' or 'regressor'") + return splitter, y_split + + def _create_mondrian_categorizer( + self, + model: WrapRegressor, + y_calib_vals: npt.NDArray[Any], + ) -> tuple[MondrianCategorizer, Callable[..., Any]]: + """Create a MondrianCategorizer for regression binning. + + Parameters + ---------- + model : WrapRegressor + The fitted regression model. + y_calib_vals : npt.NDArray[Any] + Calibration target values. + + Returns + ------- + tuple[MondrianCategorizer, Callable[..., Any]] + Fitted MondrianCategorizer and binning function. + + """ + mc_obj = MondrianCategorizer() + y_min, y_max = np.min(y_calib_vals), np.max(y_calib_vals) + n_bins = self.binning + + def bin_func( + x_test: Any, + model: Any = model, + y_min: Any = y_min, + y_max: Any = y_max, + n_bins: Any = n_bins, + ) -> Any: + """Binning function for Mondrian categorization. + + Parameters + ---------- + x_test : Any + Test features. + model : Any, optional + Fitted model. + y_min : Any, optional + Minimum target value. + y_max : Any, optional + Maximum target value. + n_bins : Any, optional + Number of bins. + + Returns + ------- + Any + Binned predictions. + + """ + y_pred = model.predict(x_test) + bins = np.linspace(y_min, y_max, n_bins + 1) + binned = np.digitize(y_pred, bins) - 1 + return np.clip(binned, 0, n_bins - 1) + + return mc_obj, bin_func + + def _fit_single_model( + self, + x_array: npt.NDArray[Any], + y_array: npt.NDArray[Any], + train_idx: npt.NDArray[np.int_], + calib_idx: npt.NDArray[np.int_], + ) -> WrapClassifier | WrapRegressor: + """Fit and calibrate a single model for one fold. + + Parameters + ---------- + x_array : npt.NDArray[Any] + Feature array. + y_array : npt.NDArray[Any] + Target array. + train_idx : npt.NDArray[np.int_] + Training indices. + calib_idx : npt.NDArray[np.int_] + Calibration indices. + + Returns + ------- + WrapClassifier | WrapRegressor + Fitted and calibrated model. + + """ + kwargs: dict[str, Any] = {} + if self.estimator_type == "classifier": + model = WrapClassifier(clone(self.estimator)) + model.fit(x_array[train_idx], y_array[train_idx]) + + if self.nonconformity is not None: + kwargs["nc"] = self.nonconformity + if self.mondrian is True: + kwargs["class_cond"] = True + elif isinstance(self.mondrian, MondrianCategorizer) or callable( + self.mondrian, + ): + kwargs["mc"] = self.mondrian + + model.calibrate(x_array[calib_idx], y_array[calib_idx], **kwargs) + + else: # regressor + model = WrapRegressor(clone(self.estimator)) + model.fit(x_array[train_idx], y_array[train_idx]) + + if isinstance(self.mondrian, MondrianCategorizer) or callable( + self.mondrian, + ): + kwargs["mc"] = self.mondrian + + if self.binning is not None and isinstance(self.binning, int): + mc_obj, bin_func = self._create_mondrian_categorizer( + model, + y_array[calib_idx], + ) + mc_obj.fit(x_array[calib_idx], f=bin_func, no_bins=self.binning) + kwargs["mc"] = mc_obj + elif isinstance(self.binning, MondrianCategorizer): + kwargs["mc"] = self.binning + + model.calibrate(x_array[calib_idx], y_array[calib_idx], **kwargs) + + return model + + def fit( + self, + x: npt.NDArray[Any], + y: npt.NDArray[Any], + ) -> "CrossConformalPredictor": + """Fit the cross-conformal predictor. + + Parameters + ---------- + x : npt.NDArray[Any] + Training features. + y : npt.NDArray[Any] + Training targets. + + Returns + ------- + CrossConformalPredictor + Self. + + """ + self.models_ = [] + rng = check_random_state(self.random_state) + splitter, y_split = self._create_splitter(y, rng) + + x_array = np.asarray(x) + y_array = np.asarray(y) + + for train_idx, calib_idx in splitter.split(x_array, y_split): + model = self._fit_single_model(x_array, y_array, train_idx, calib_idx) + self.models_.append(model) + + self.fitted_ = True + self.calibrated_ = True # Models are calibrated during fit + return self + + def predict(self, x: npt.NDArray[Any]) -> npt.NDArray[Any]: + """Predict using the cross-conformal predictor. + + Parameters + ---------- + x : npt.NDArray[Any] + Features to predict. + + Returns + ------- + npt.NDArray[Any] + Predictions (majority vote for classification, mean for regression). + + Raises + ------ + ValueError + If estimator must be fitted before calling predict. + + """ + if not self.fitted_: + raise ValueError("Estimator must be fitted before calling predict") + + if self.estimator_type == "classifier": + result = np.array([m.predict(x) for m in self.models_]) + pred_mode = mode(result, axis=0, keepdims=False) + return np.ravel(pred_mode.mode) + result = np.array([m.predict(x) for m in self.models_]) + return np.mean(result, axis=0) + + def predict_proba(self, x: npt.NDArray[Any]) -> npt.NDArray[Any]: + """Predict probabilities using the cross-conformal predictor. + + Parameters + ---------- + x : npt.NDArray[Any] + Features to predict. + + Returns + ------- + npt.NDArray[Any] + Predicted probabilities (averaged). + + Raises + ------ + ValueError + If estimator must be fitted before calling predict_proba. + NotImplementedError + If called for a regressor. + + """ + if not self.fitted_: + raise ValueError("Estimator must be fitted before calling predict_proba") + if self.estimator_type != "classifier": + raise NotImplementedError("predict_proba is for classifiers only.") + result = np.array([m.predict_proba(x) for m in self.models_]) + return np.atleast_2d(np.mean(result, axis=0)) + + def predict_conformal_set( + self, + x: npt.NDArray[Any], + confidence: float | None = None, + ) -> list[list[int]]: + """Predict conformal sets using the cross-conformal predictor. + + Parameters + ---------- + x : npt.NDArray[Any] + Features to predict. + confidence : float, optional + Confidence level. Must be in (0, 1). + + Returns + ------- + list[list[int]] + Conformal prediction sets as list of lists containing class indices. + + Raises + ------ + ValueError + If estimator must be fitted before calling predict_conformal_set. + NotImplementedError + If called for a regressor. + + """ + if not self.fitted_: + raise ValueError( + "Estimator must be fitted before calling predict_conformal_set", + ) + if self.estimator_type != "classifier": + raise NotImplementedError( + "predict_conformal_set is only for classification.", + ) + + conf = confidence if confidence is not None else self.confidence_level + if not 0 < conf < 1: + raise ValueError(f"confidence must be in (0, 1), got {conf}") + + p_values_list = [m.predict_p(x) for m in self.models_] + aggregated_p_values = np.mean(p_values_list, axis=0) + + prediction_sets_binary = (aggregated_p_values >= 1 - conf).astype(int) + + prediction_sets = [] + for i in range(prediction_sets_binary.shape[0]): + class_indices = np.where(prediction_sets_binary[i, :])[0].tolist() + prediction_sets.append(class_indices) + + return prediction_sets + + def predict_p(self, x: npt.NDArray[Any], **kwargs: Any) -> npt.NDArray[Any]: + """Predict p-values using the cross-conformal predictor. + + Parameters + ---------- + x : npt.NDArray[Any] + Features to predict. + kwargs : dict + Additional parameters. + + Returns + ------- + npt.NDArray[Any] + Aggregated p-values from all folds. + + Raises + ------ + ValueError + If estimator must be fitted before calling predict_p. + NotImplementedError + If called for a regressor. + + """ + if not self.fitted_: + raise ValueError("Estimator must be fitted before calling predict_p") + if self.estimator_type != "classifier": + raise NotImplementedError("predict_p is only for classification.") + + p_values_list = [m.predict_p(x, **kwargs) for m in self.models_] + return np.mean(p_values_list, axis=0) + + def predict_int( + self, + x: npt.NDArray[Any], + confidence: float | None = None, + ) -> npt.NDArray[Any]: + """Predict intervals using the cross-conformal predictor. + + Parameters + ---------- + x : npt.NDArray[Any] + Features to predict. + confidence : float, optional + Confidence level. Must be in (0, 1). + + Returns + ------- + npt.NDArray[Any] + Prediction intervals based on aggregated predictions. + + Raises + ------ + ValueError + If estimator must be fitted before calling predict_int + or if confidence is not in valid range. + NotImplementedError + If called for a classifier. + + """ + if not self.fitted_: + raise ValueError("Estimator must be fitted before calling predict_int") + if self.estimator_type != "regressor": + raise NotImplementedError("predict_int is only for regression.") + + conf = confidence if confidence is not None else self.confidence_level + if not 0 < conf < 1: + raise ValueError(f"confidence must be in (0, 1), got {conf}") + + intervals_list = [m.predict_int(x, confidence=conf) for m in self.models_] + + intervals_array = np.array(intervals_list) # shape: (n_folds, n_samples, 2) + lower_bounds = np.mean(intervals_array[:, :, 0], axis=0) + upper_bounds = np.mean(intervals_array[:, :, 1], axis=0) + + return np.column_stack([lower_bounds, upper_bounds]) + + def get_params(self, deep: bool = True) -> dict[str, Any]: + """Get parameters for this estimator. + + Parameters + ---------- + deep : bool, optional + If True, will return the parameters for this estimator. + + Returns + ------- + dict[str, Any] + Parameter names mapped to their values. + + """ + params = super().get_params(deep=deep) + + cross_params = { + "n_folds": self.n_folds, + "n_bins": self.n_bins, + "random_state": self.random_state, + } + params.update(cross_params) + + return params + + def set_params(self, **params: Any) -> "CrossConformalPredictor": + """Set the parameters of this estimator. + + Parameters + ---------- + **params : dict + Estimator parameters. + + Returns + ------- + CrossConformalPredictor + This estimator. + + Raises + ------ + ValueError + If invalid parameter provided. + + """ + valid_params = self.get_params(deep=False) + estimator_params: dict[str, Any] = {} + + for key, value in params.items(): + if key.startswith("estimator__"): + nested_key = key[len("estimator__") :] + estimator_params[nested_key] = value + elif key in valid_params: + setattr(self, key, value) + else: + raise ValueError( + f"Invalid parameter {key} for estimator {type(self).__name__}. ", + ) + + if estimator_params and hasattr(self.estimator, "set_params"): + self.estimator.set_params(**estimator_params) + + return self diff --git a/molpipeline/experimental/uncertainty/utils.py b/molpipeline/experimental/uncertainty/utils.py new file mode 100644 index 00000000..3d9df3b4 --- /dev/null +++ b/molpipeline/experimental/uncertainty/utils.py @@ -0,0 +1,61 @@ +"""Conformal prediction utils.""" + +from typing import Any, Literal + +import numpy as np +import numpy.typing as npt +from sklearn.base import BaseEstimator, is_classifier, is_regressor + + +def _bin_targets(y: npt.NDArray[Any], n_bins: int = 10) -> npt.NDArray[np.int_]: + """Bin continuous targets for stratified splitting in regression. + + Parameters + ---------- + y : npt.NDArray[Any] + Continuous target values to bin. + n_bins : int, default=10 + Number of bins to create. + + Returns + ------- + npt.NDArray[np.int_] + Binned targets as integer indices. + + """ + y = np.asarray(y) + bins = np.linspace(np.min(y), np.max(y), n_bins + 1) + y_binned = np.digitize(y, bins) - 1 + y_binned[y_binned == n_bins] = n_bins - 1 + return y_binned + + +def _detect_estimator_type( + estimator: BaseEstimator, +) -> Literal["classifier", "regressor"]: + """Automatically detect whether an estimator is a classifier or regressor. + + Parameters + ---------- + estimator : BaseEstimator + The sklearn estimator to check. + + Returns + ------- + Literal["classifier", "regressor"] + The detected estimator type. + + Raises + ------ + ValueError + If type cannot be determined. + + """ + if is_classifier(estimator): + return "classifier" + if is_regressor(estimator): + return "regressor" + raise ValueError( + f"Could not determine if {type(estimator).__name__} is a " + "classifier or regressor. Please specify estimator_type explicitly.", + ) diff --git a/notebooks/advanced_04_conformal_prediction.ipynb b/notebooks/advanced_04_conformal_prediction.ipynb new file mode 100644 index 00000000..828a8bba --- /dev/null +++ b/notebooks/advanced_04_conformal_prediction.ipynb @@ -0,0 +1,965 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "9288a05c", + "metadata": {}, + "source": [ + "\n", + "# Real-World Example: Conformal Prediction on Renin Inhibitor Data\n", + "\n", + "This notebook demonstrates robust benchmarking of conformal prediction (CP) methods on a real molecular dataset (`renin_harren.csv`). We compare CP to standard uncertainty quantification (UQ) methods and ML models, using advanced metrics. All steps are NaN-safe and ready for direct use.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "ab2b079b", + "metadata": {}, + "outputs": [ + { + "ename": "ModuleNotFoundError", + "evalue": "No module named 'matplotlib'", + "output_type": "error", + "traceback": [ + "\u001b[31m---------------------------------------------------------------------------\u001b[39m", + "\u001b[31mModuleNotFoundError\u001b[39m Traceback (most recent call last)", + "\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[6]\u001b[39m\u001b[32m, line 2\u001b[39m\n\u001b[32m 1\u001b[39m \u001b[38;5;66;03m# 1. Import Required Libraries and Define Utility Functions\u001b[39;00m\n\u001b[32m----> \u001b[39m\u001b[32m2\u001b[39m \u001b[38;5;28;01mimport\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34;01mmatplotlib\u001b[39;00m\u001b[34;01m.\u001b[39;00m\u001b[34;01mpyplot\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[38;5;28;01mas\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34;01mplt\u001b[39;00m\n\u001b[32m 3\u001b[39m \u001b[38;5;28;01mimport\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34;01mnumpy\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[38;5;28;01mas\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34;01mnp\u001b[39;00m\n\u001b[32m 4\u001b[39m \u001b[38;5;28;01mimport\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34;01mpandas\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[38;5;28;01mas\u001b[39;00m\u001b[38;5;250m \u001b[39m\u001b[34;01mpd\u001b[39;00m\n", + "\u001b[31mModuleNotFoundError\u001b[39m: No module named 'matplotlib'" + ] + } + ], + "source": [ + "# 1. Import Required Libraries and Define Utility Functions\n", + "import matplotlib.pyplot as plt\n", + "import numpy as np\n", + "import pandas as pd\n", + "from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor\n", + "from sklearn.metrics import (\n", + " average_precision_score,\n", + " balanced_accuracy_score,\n", + " brier_score_loss,\n", + " f1_score,\n", + " log_loss,\n", + " matthews_corrcoef,\n", + " roc_auc_score,\n", + ")\n", + "from sklearn.model_selection import StratifiedKFold, train_test_split\n", + "\n", + "from molpipeline.any2mol import SmilesToMol\n", + "from molpipeline.error_handling import ErrorFilter, FilterReinserter\n", + "from molpipeline.experimental.uncertainty.conformal import (\n", + " CrossConformalCV,\n", + ")\n", + "from molpipeline.mol2any.mol2morgan_fingerprint import MolToMorganFP\n", + "from molpipeline.pipeline import Pipeline\n", + "from molpipeline.post_prediction import PostPredictionWrapper\n", + "\n", + "THRESHOLD = 0.5\n", + "\n", + "\n", + "def compute_ece(y_true: np.ndarray, probs: np.ndarray, n_bins: int = 10) -> float:\n", + " \"\"\"Compute Expected Calibration Error (ECE).\n", + "\n", + " Parameters\n", + " ----------\n", + " y_true : np.ndarray\n", + " True binary labels.\n", + " probs : np.ndarray\n", + " Predicted probabilities.\n", + " n_bins : int, optional\n", + " Number of bins (default: 10).\n", + "\n", + " Returns\n", + " -------\n", + " float\n", + " Expected calibration error.\n", + "\n", + " \"\"\"\n", + " bins = np.linspace(0, 1, n_bins + 1)\n", + " binids = np.digitize(probs, bins) - 1\n", + " ece = 0.0\n", + " for i in range(n_bins):\n", + " mask = binids == i\n", + " if np.any(mask):\n", + " acc = np.mean(y_true[mask] == (probs[mask] >= THRESHOLD))\n", + " conf = np.mean(probs[mask])\n", + " ece += np.abs(acc - conf) * np.sum(mask) / len(y_true)\n", + " return ece\n", + "\n", + "\n", + "def uncertain_error_corr(y_true: np.ndarray, probs: np.ndarray) -> float:\n", + " \"\"\"Compute correlation between uncertainty and error.\n", + "\n", + " Parameters\n", + " ----------\n", + " y_true : np.ndarray\n", + " True binary labels.\n", + " probs : np.ndarray\n", + " Predicted probabilities.\n", + "\n", + " Returns\n", + " -------\n", + " float\n", + " Correlation coefficient between entropy and error.\n", + "\n", + " \"\"\"\n", + " eps = 1e-12\n", + " entropy = -probs * np.log(probs + eps) - (1 - probs) * np.log(1 - probs + eps)\n", + " error = np.abs(y_true - (probs >= THRESHOLD))\n", + " return np.corrcoef(entropy, error)[0, 1]\n", + "\n", + "\n", + "def compute_sharpness(probs: np.ndarray) -> float:\n", + " \"\"\"Compute sharpness (mean entropy) of predicted probabilities.\n", + "\n", + " Parameters\n", + " ----------\n", + " probs : np.ndarray\n", + " Predicted probabilities.\n", + "\n", + " Returns\n", + " -------\n", + " float\n", + " Mean entropy of predicted probabilities.\n", + "\n", + " \"\"\"\n", + " eps = 1e-12\n", + " entropy = -probs * np.log(probs + eps) - (1 - probs) * np.log(1 - probs + eps)\n", + " return np.mean(entropy)" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "c2281174", + "metadata": {}, + "outputs": [ + { + "ename": "NameError", + "evalue": "name 'pd' is not defined", + "output_type": "error", + "traceback": [ + "\u001b[31m---------------------------------------------------------------------------\u001b[39m", + "\u001b[31mNameError\u001b[39m Traceback (most recent call last)", + "\u001b[36mCell\u001b[39m\u001b[36m \u001b[39m\u001b[32mIn[2]\u001b[39m\u001b[32m, line 3\u001b[39m\n\u001b[32m 1\u001b[39m \u001b[38;5;66;03m# 2. Data Loading, Cleaning, and Featurization\u001b[39;00m\n\u001b[32m 2\u001b[39m \u001b[38;5;66;03m# Load real data\u001b[39;00m\n\u001b[32m----> \u001b[39m\u001b[32m3\u001b[39m df = \u001b[43mpd\u001b[49m.read_csv(\u001b[33m\"\u001b[39m\u001b[33mexample_data/renin_harren.csv\u001b[39m\u001b[33m\"\u001b[39m)\n\u001b[32m 4\u001b[39m smiles = df[\u001b[33m\"\u001b[39m\u001b[33mpubchem_smiles\u001b[39m\u001b[33m\"\u001b[39m].to_numpy()\n\u001b[32m 5\u001b[39m y_reg = df[\u001b[33m\"\u001b[39m\u001b[33mpIC50\u001b[39m\u001b[33m\"\u001b[39m].to_numpy()\n", + "\u001b[31mNameError\u001b[39m: name 'pd' is not defined" + ] + } + ], + "source": [ + "# 2. Data Loading, Cleaning, and Featurization\n", + "# Load real data\n", + "df = pd.read_csv(\"example_data/renin_harren.csv\")\n", + "smiles = df[\"pubchem_smiles\"].to_numpy()\n", + "y_reg = df[\"pIC50\"].to_numpy()\n", + "\n", + "# Binarize for classification: top 20% as 'active'\n", + "threshold = np.nanquantile(y_reg, 0.8)\n", + "y_class = (y_reg >= threshold).astype(int)\n", + "\n", + "# Featurization pipeline (NaN-safe)\n", + "error_filter = ErrorFilter(filter_everything=True)\n", + "error_replacer = FilterReinserter.from_error_filter(error_filter, fill_value=np.nan)\n", + "featurizer = Pipeline(\n", + " [\n", + " (\"smi2mol\", SmilesToMol()),\n", + " (\"error_filter\", error_filter),\n", + " (\"morgan\", MolToMorganFP(radius=2, n_bits=256, return_as=\"dense\")),\n", + " (\"error_replacer\", PostPredictionWrapper(error_replacer)),\n", + " ],\n", + " n_jobs=1,\n", + ")\n", + "X_feat = featurizer.transform(smiles)\n", + "\n", + "print(f\"Shape of X={X_feat.shape}, y_class={y_class.shape}, y_reg={y_reg.shape}\")\n", + "\n", + "# Generate indices for a single split\n", + "indices = np.arange(len(y_class))\n", + "train_idx, test_idx = train_test_split(\n", + " indices,\n", + " test_size=0.3,\n", + " random_state=42,\n", + " stratify=y_class,\n", + ")\n", + "\n", + "# Use these indices for all splits\n", + "X_train, X_test = X_feat[train_idx], X_feat[test_idx]\n", + "y_train, y_test = y_class[train_idx], y_class[test_idx]\n", + "smiles_train, smiles_test = smiles[train_idx], smiles[test_idx]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e4b28946", + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "40f1540a", + "metadata": {}, + "outputs": [], + "source": [ + "# 3.1 Cross-Validation Benchmarking: Standard Models and Conformal Prediction\n", + "\n", + "# Use StratifiedKFold on the training set\n", + "skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)\n", + "\n", + "model_dict = {\n", + " \"ensemble_rf\": RandomForestClassifier(n_estimators=100, random_state=42),\n", + "}\n", + "metrics_list = [\n", + " \"NLL\",\n", + " \"ECE\",\n", + " \"Brier\",\n", + " \"Uncertainty Error Correlation\",\n", + " \"Sharpness\",\n", + " \"Balanced Accuracy\",\n", + " \"AUROC\",\n", + " \"AUPRC\",\n", + " \"F1 Score\",\n", + " \"MCC\",\n", + "]\n", + "results = []\n", + "results_cp = []\n", + "\n", + "# Arrays to collect out-of-fold predictions\n", + "oof_preds = np.zeros_like(y_train, dtype=float)\n", + "oof_preds_cp_norm = np.zeros_like(y_train, dtype=float)\n", + "oof_preds_cp_raw = np.zeros_like(y_train, dtype=float)\n", + "\n", + "for fold, (train_idx, val_idx) in enumerate(skf.split(X_train, y_train)):\n", + " print(f\"Fold {fold + 1}\")\n", + " X_tr, X_val = X_train[train_idx], X_train[val_idx]\n", + " y_tr, y_val = y_train[train_idx], y_train[val_idx]\n", + " smiles_tr, smiles_val = smiles_train[train_idx], smiles_train[val_idx]\n", + "\n", + " # --- Standard Model ---\n", + " for model in model_dict.values():\n", + " model.fit(X_tr, y_tr)\n", + " prob = model.predict_proba(X_val)[:, 1]\n", + " oof_preds[val_idx] = prob\n", + "\n", + " # --- Conformal Prediction (CrossConformalCV) ---\n", + " rf = RandomForestClassifier(n_estimators=100, random_state=42)\n", + " rf_pipeline = Pipeline(\n", + " [\n", + " (\"featurizer\", featurizer),\n", + " (\"rf\", rf),\n", + " ],\n", + " n_jobs=1,\n", + " )\n", + " cc_clf = CrossConformalCV(\n", + " estimator=rf_pipeline,\n", + " n_folds=5,\n", + " confidence_level=0.9,\n", + " estimator_type=\"classifier\",\n", + " )\n", + " cc_clf.fit(smiles_tr, y_tr)\n", + " # Average ensemble probabilities for the validation fold\n", + " probs_cp_ensemble = np.mean(\n", + " [m.predict_p(smiles_val) for m in cc_clf.models_], axis=0\n", + " )\n", + " probs_cp_ensemble_raw = np.mean(\n", + " [m.predict_proba(smiles_val) for m in cc_clf.models_], axis=0\n", + " )\n", + " p0 = probs_cp_ensemble[:, 0]\n", + " p1 = probs_cp_ensemble[:, 1]\n", + " p1_norm = p1 / (p0 + p1 + 1e-12)\n", + " oof_preds_cp_norm[val_idx] = p1_norm\n", + " oof_preds_cp_raw[val_idx] = probs_cp_ensemble_raw[:, 1]\n", + "\n", + "# Create a DataFrame to compare raw and normalized conformal probabilities\n", + "df_oof_compare = pd.DataFrame(\n", + " {\n", + " \"y_true\": y_train,\n", + " \"StandardModel\": oof_preds,\n", + " \"ConformalRaw\": oof_preds_cp_raw,\n", + " \"ConformalNorm\": oof_preds_cp_norm,\n", + " }\n", + ")\n", + "\n", + "# Compute metrics for out-of-fold predictions (standard model)\n", + "mean_pred = (oof_preds >= THRESHOLD).astype(int)\n", + "metrics = {\n", + " \"Model\": \"ensemble_xgb (OOF)\",\n", + " \"NLL\": log_loss(y_train, oof_preds),\n", + " \"ECE\": compute_ece(y_train, oof_preds),\n", + " \"Brier\": brier_score_loss(y_train, oof_preds),\n", + " \"Uncertainty Error Correlation\": uncertain_error_corr(y_train, oof_preds),\n", + " \"Sharpness\": compute_sharpness(oof_preds),\n", + " \"Balanced Accuracy\": balanced_accuracy_score(y_train, mean_pred),\n", + " \"AUROC\": roc_auc_score(y_train, oof_preds),\n", + " \"AUPRC\": average_precision_score(y_train, oof_preds),\n", + " \"F1 Score\": f1_score(y_train, mean_pred),\n", + " \"MCC\": matthews_corrcoef(y_train, mean_pred),\n", + "}\n", + "results.append(metrics)\n", + "\n", + "# Compute metrics for out-of-fold predictions (conformal, both raw and norm)\n", + "mean_pred_cp_norm = (oof_preds_cp_norm >= THRESHOLD).astype(int)\n", + "metrics_cp_norm = {\n", + " \"Model\": \"CrossConformalCV (OOF, norm)\",\n", + " \"NLL\": log_loss(y_train, oof_preds_cp_norm),\n", + " \"ECE\": compute_ece(y_train, oof_preds_cp_norm),\n", + " \"Brier\": brier_score_loss(y_train, oof_preds_cp_norm),\n", + " \"Uncertainty Error Correlation\": uncertain_error_corr(y_train, oof_preds_cp_norm),\n", + " \"Sharpness\": compute_sharpness(oof_preds_cp_norm),\n", + " \"Balanced Accuracy\": balanced_accuracy_score(y_train, mean_pred_cp_norm),\n", + " \"AUROC\": roc_auc_score(y_train, oof_preds_cp_norm),\n", + " \"AUPRC\": average_precision_score(y_train, oof_preds_cp_norm),\n", + " \"F1 Score\": f1_score(y_train, mean_pred_cp_norm),\n", + " \"MCC\": matthews_corrcoef(y_train, mean_pred_cp_norm),\n", + "}\n", + "results_cp.append(metrics_cp_norm)\n", + "\n", + "mean_pred_cp_raw = (oof_preds_cp_raw >= THRESHOLD).astype(int)\n", + "metrics_cp_raw = {\n", + " \"Model\": \"CrossConformalCV (OOF, raw)\",\n", + " \"NLL\": log_loss(y_train, oof_preds_cp_raw),\n", + " \"ECE\": compute_ece(y_train, oof_preds_cp_raw),\n", + " \"Brier\": brier_score_loss(y_train, oof_preds_cp_raw),\n", + " \"Uncertainty Error Correlation\": uncertain_error_corr(y_train, oof_preds_cp_raw),\n", + " \"Sharpness\": compute_sharpness(oof_preds_cp_raw),\n", + " \"Balanced Accuracy\": balanced_accuracy_score(y_train, mean_pred_cp_raw),\n", + " \"AUROC\": roc_auc_score(y_train, oof_preds_cp_raw),\n", + " \"AUPRC\": average_precision_score(y_train, oof_preds_cp_raw),\n", + " \"F1 Score\": f1_score(y_train, mean_pred_cp_raw),\n", + " \"MCC\": matthews_corrcoef(y_train, mean_pred_cp_raw),\n", + "}\n", + "results_cp.append(metrics_cp_raw)\n", + "\n", + "results_df = pd.DataFrame(results + results_cp).set_index(\"Model\").T\n", + "display(results_df)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ad5d684e", + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "629b1099", + "metadata": {}, + "outputs": [], + "source": [ + "import numpy as np\n", + "\n", + "probs_standard = df_oof_compare[\"StandardModel\"].to_numpy()\n", + "probs_raw = df_oof_compare[\"ConformalRaw\"].to_numpy()\n", + "probs_norm = df_oof_compare[\"ConformalNorm\"].to_numpy()\n", + "\n", + "plt.figure(figsize=(8, 5))\n", + "bins = np.linspace(0, 1, 21)\n", + "\n", + "\n", + "def plot_percentage_line(\n", + " probs: np.ndarray, bins: np.ndarray, label: str, color: str\n", + ") -> None:\n", + " \"\"\"Plot percentage of predictions in each probability bin.\"\"\"\n", + " counts, bin_edges = np.histogram(probs, bins=bins)\n", + " percent = 100 * counts / len(probs)\n", + " bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2\n", + " plt.plot(bin_centers, percent, marker=\"o\", label=label, color=color, linewidth=2)\n", + "\n", + "\n", + "plot_percentage_line(probs_standard, bins, \"Standard Model\", \"tab:blue\")\n", + "plot_percentage_line(probs_raw, bins, \"Conformal Raw\", \"tab:orange\")\n", + "plot_percentage_line(probs_norm, bins, \"Conformal Norm\", \"tab:green\")\n", + "\n", + "plt.xlabel(\"Predicted Probability\")\n", + "plt.ylabel(\"Percentage (%)\")\n", + "plt.title(\"Percentage of Predicted Probabilities in Each Bin (OOF)\")\n", + "plt.legend()\n", + "plt.tight_layout()\n", + "plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2bcaf7d7", + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "application/vnd.microsoft.datawrangler.viewer.v0+json": { + "columns": [ + { + "name": "index", + "rawType": "int64", + "type": "integer" + }, + { + "name": "SMILES", + "rawType": "object", + "type": "string" + }, + { + "name": "p0", + "rawType": "float64", + "type": "float" + }, + { + "name": "p1", + "rawType": "float64", + "type": "float" + }, + { + "name": "p1_norm", + "rawType": "float64", + "type": "float" + }, + { + "name": "conformal_set", + "rawType": "object", + "type": "unknown" + }, + { + "name": "true_label", + "rawType": "int64", + "type": "integer" + } + ], + "ref": "c2eec36b-ea3e-40f3-94f4-dd7ee17654c1", + "rows": [ + [ + "0", + "CC1=CC=CC=C1OC2=C(C3=C(N2C4=CC=CC=C4)N=CC=C3)C(=O)N5CCNCC5", + "0.8524618705697028", + "0.034328378006639994", + "0.038710820356574985", + "[0, 1]", + "0" + ], + [ + "1", + "C1CCN(CC1)C2=C(C3=CC=CC=C3N2C4=CC=CC=C4)C(=O)N5CCNCC5", + "0.6324145223238109", + "0.06271762947875074", + "0.09022403771142816", + "[0, 1]", + "0" + ], + [ + "2", + "CC1=C(C=CC=C1F)CC2=C(C3=C(N2C4=CC=CC=C4)C=C(C=C3)O)C(=O)N5CCNCC5", + "0.21540320870091612", + "0.3071940554024495", + "0.5878217826664724", + "[1]", + "1" + ], + [ + "3", + "C1CN(CCN1)C(=O)C2=C(N(C3=C2N=CC=C3)C4=CC=CC=C4)CC5=C(C(=CC=C5)F)F", + "0.3248329740540647", + "0.24656932448754013", + "0.43151615790911185", + "[1]", + "0" + ], + [ + "4", + "CC1=C(C=CC=C1F)CC2=C(C3=CN=C(C=C3N2C4CCCCC4)OC)C(=O)N5CCNC(C5)CO", + "0.6475921388608785", + "0.059184034522427174", + "0.08373801601012118", + "[0, 1]", + "0" + ] + ], + "shape": { + "columns": 6, + "rows": 5 + } + }, + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
SMILESp0p1p1_normconformal_settrue_label
0CC1=CC=CC=C1OC2=C(C3=C(N2C4=CC=CC=C4)N=CC=C3)C...0.8524620.0343280.038711[0, 1]0
1C1CCN(CC1)C2=C(C3=CC=CC=C3N2C4=CC=CC=C4)C(=O)N...0.6324150.0627180.090224[0, 1]0
2CC1=C(C=CC=C1F)CC2=C(C3=C(N2C4=CC=CC=C4)C=C(C=...0.2154030.3071940.587822[1]1
3C1CN(CCN1)C(=O)C2=C(N(C3=C2N=CC=C3)C4=CC=CC=C4...0.3248330.2465690.431516[1]0
4CC1=C(C=CC=C1F)CC2=C(C3=CN=C(C=C3N2C4CCCCC4)OC...0.6475920.0591840.083738[0, 1]0
\n", + "
" + ], + "text/plain": [ + " SMILES p0 p1 \\\n", + "0 CC1=CC=CC=C1OC2=C(C3=C(N2C4=CC=CC=C4)N=CC=C3)C... 0.852462 0.034328 \n", + "1 C1CCN(CC1)C2=C(C3=CC=CC=C3N2C4=CC=CC=C4)C(=O)N... 0.632415 0.062718 \n", + "2 CC1=C(C=CC=C1F)CC2=C(C3=C(N2C4=CC=CC=C4)C=C(C=... 0.215403 0.307194 \n", + "3 C1CN(CCN1)C(=O)C2=C(N(C3=C2N=CC=C3)C4=CC=CC=C4... 0.324833 0.246569 \n", + "4 CC1=C(C=CC=C1F)CC2=C(C3=CN=C(C=C3N2C4CCCCC4)OC... 0.647592 0.059184 \n", + "\n", + " p1_norm conformal_set true_label \n", + "0 0.038711 [0, 1] 0 \n", + "1 0.090224 [0, 1] 0 \n", + "2 0.587822 [1] 1 \n", + "3 0.431516 [1] 0 \n", + "4 0.083738 [0, 1] 0 " + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Conformal set coverage: 0.833\n", + "Conformal set average size: 1.667\n", + "Conformal set error: 0.167\n", + "Fraction of empty sets: 0.000\n", + "NLL: 0.4559115484789948\n", + "Brier: 0.15137191487078414\n", + "AUROC: 0.7643097643097643\n", + "F1: 0.42857142857142855\n", + "MCC: 0.34555798270379956\n" + ] + } + ], + "source": [ + "# 3.3 Visualizing Uncertainty and Prediction Sets\n", + "\n", + "plt.figure(figsize=(8, 4))\n", + "plt.hist(p1, bins=20, alpha=0.7, label=\"Best Ensemble Model Probabilities\")\n", + "plt.xlabel(\"Predicted Probability (Active)\")\n", + "plt.ylabel(\"Count\")\n", + "plt.legend()\n", + "plt.title(\"Predicted Probabilities Distribution\")\n", + "plt.show()\n", + "\n", + "\n", + "# Get conformal prediction sets (list of sets per sample)\n", + "conf_pred_sets = cc_clf.predict_conformal_set(smiles_test, confidence=0.9)\n", + "\n", + "# Get p-values for each class (p0, p1)\n", + "p_vals = cc_clf.models_[0].predict_p(smiles_test)\n", + "if hasattr(cc_clf, \"models_\") and len(cc_clf.models_) > 1:\n", + " p_vals = np.mean([m.predict_p(smiles_test) for m in cc_clf.models_], axis=0)\n", + "\n", + "p0 = p_vals[:, 0]\n", + "p1 = p_vals[:, 1]\n", + "p1_norm = p1 / (p0 + p1 + 1e-12)\n", + "\n", + "df_cp_class = pd.DataFrame(\n", + " {\n", + " \"SMILES\": smiles_test,\n", + " \"p0\": p0,\n", + " \"p1\": p1,\n", + " \"p1_norm\": p1_norm,\n", + " \"conformal_set\": conf_pred_sets,\n", + " \"true_label\": y_test,\n", + " }\n", + ")\n", + "display(df_cp_class.head())\n", + "\n", + "\n", + "def coverage_and_set_size(y_true: np.ndarray, conf_sets: list) -> tuple[float, float]:\n", + " \"\"\"Compute coverage and average set size for conformal sets.\n", + "\n", + " Returns\n", + " -------\n", + " float, float\n", + " Coverage (fraction of true labels in sets) and average set size.\n", + "\n", + " \"\"\"\n", + " covered = [y in s for y, s in zip(y_true, conf_sets, strict=True)]\n", + " avg_size = np.mean([len(s) for s in conf_sets])\n", + " return np.mean(covered), avg_size\n", + "\n", + "\n", + "coverage, avg_set_size = coverage_and_set_size(y_test, conf_pred_sets)\n", + "error = 1 - coverage\n", + "empty = np.mean([len(s) == 0 for s in conf_pred_sets])\n", + "\n", + "print(f\"Conformal set coverage: {coverage:.3f}\")\n", + "print(f\"Conformal set average size: {avg_set_size:.3f}\")\n", + "print(f\"Conformal set error: {error:.3f}\")\n", + "print(f\"Fraction of empty sets: {empty:.3f}\")\n", + "print(\"NLL:\", log_loss(y_test, p1_norm))\n", + "print(\"Brier:\", brier_score_loss(y_test, p1_norm))\n", + "print(\"AUROC:\", roc_auc_score(y_test, p1_norm))\n", + "print(\"F1:\", f1_score(y_test, (p1_norm >= THRESHOLD).astype(int)))\n", + "print(\"MCC:\", matthews_corrcoef(y_test, (p1_norm >= THRESHOLD).astype(int)))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6cd8a8da", + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.microsoft.datawrangler.viewer.v0+json": { + "columns": [ + { + "name": "index", + "rawType": "int64", + "type": "integer" + }, + { + "name": "pubchem_smiles", + "rawType": "object", + "type": "string" + }, + { + "name": "pIC50", + "rawType": "float64", + "type": "float" + }, + { + "name": "pred_lower", + "rawType": "float64", + "type": "float" + }, + { + "name": "pred_upper", + "rawType": "float64", + "type": "float" + }, + { + "name": "point_pred", + "rawType": "float64", + "type": "float" + } + ], + "ref": "f965cae9-1066-4502-88ff-4d9ec7c9226a", + "rows": [ + [ + "0", + "CC1=C(C=C(C=C1)F)OC2=C(C3=C(N2C4=CC=CC=C4)N=CC=C3)C(=O)N5CCNCC5", + "6.4023", + "4.701805199999997", + "8.831095599999994", + "6.766450399999995" + ], + [ + "1", + "C1CN(CCN1)C(=O)C2=C(N(C3=C2C=CN=C3)C4=CC=CC=C4)CC5=CC=CC=C5", + "6.1186", + "3.9571802", + "8.086470599999998", + "6.021825399999999" + ], + [ + "2", + "CC1=C(C=CC=C1F)CC2=C(C3=C(N2C4=CC=CC=C4)N=CC(=C3)O)C(=O)N5CCNCC5", + "8.2218", + "5.641988400000004", + "9.771278800000003", + "7.7066336000000035" + ], + [ + "3", + "C1CN(CCN1)C(=O)C2=C(N(C3=CC=CC=C32)C4=CC=CC=C4)CC5=C(C=CC=C5Cl)F", + "7.7447", + "4.515626999999999", + "8.644917399999997", + "6.580272199999999" + ], + [ + "4", + "CC1=C(C=CC=C1F)CC2=C(C3=CNC(=O)C=C3N2C4CCCCC4)C(=O)N5CCNCC5", + "6.9355", + "4.9534574", + "9.082747799999998", + "7.018102599999999" + ] + ], + "shape": { + "columns": 5, + "rows": 5 + } + }, + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
pubchem_smilespIC50pred_lowerpred_upperpoint_pred
0CC1=C(C=C(C=C1)F)OC2=C(C3=C(N2C4=CC=CC=C4)N=CC...6.40234.7018058.8310966.766450
1C1CN(CCN1)C(=O)C2=C(N(C3=C2C=CN=C3)C4=CC=CC=C4...6.11863.9571808.0864716.021825
2CC1=C(C=CC=C1F)CC2=C(C3=C(N2C4=CC=CC=C4)N=CC(=...8.22185.6419889.7712797.706634
3C1CN(CCN1)C(=O)C2=C(N(C3=CC=CC=C32)C4=CC=CC=C4...7.74474.5156278.6449176.580272
4CC1=C(C=CC=C1F)CC2=C(C3=CNC(=O)C=C3N2C4CCCCC4)...6.93554.9534579.0827487.018103
\n", + "
" + ], + "text/plain": [ + " pubchem_smiles pIC50 pred_lower \\\n", + "0 CC1=C(C=C(C=C1)F)OC2=C(C3=C(N2C4=CC=CC=C4)N=CC... 6.4023 4.701805 \n", + "1 C1CN(CCN1)C(=O)C2=C(N(C3=C2C=CN=C3)C4=CC=CC=C4... 6.1186 3.957180 \n", + "2 CC1=C(C=CC=C1F)CC2=C(C3=C(N2C4=CC=CC=C4)N=CC(=... 8.2218 5.641988 \n", + "3 C1CN(CCN1)C(=O)C2=C(N(C3=CC=CC=C32)C4=CC=CC=C4... 7.7447 4.515627 \n", + "4 CC1=C(C=CC=C1F)CC2=C(C3=CNC(=O)C=C3N2C4CCCCC4)... 6.9355 4.953457 \n", + "\n", + " pred_upper point_pred \n", + "0 8.831096 6.766450 \n", + "1 8.086471 6.021825 \n", + "2 9.771279 7.706634 \n", + "3 8.644917 6.580272 \n", + "4 9.082748 7.018103 " + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Interval coverage: 1.000\n", + "Average interval width: 4.129\n", + "MAE (point prediction): 0.662\n" + ] + } + ], + "source": [ + "# 4. Regression: Conformal Prediction and Interval Evaluation\n", + "\n", + "# --- Prepare regression data (filter NaNs as before) ---\n", + "mask_reg = ~np.isnan(X_feat).any(axis=1) & ~np.isnan(y_reg)\n", + "X_feat_reg = X_feat[mask_reg]\n", + "y_reg_clean = y_reg[mask_reg]\n", + "smiles_reg = np.array(smiles)[mask_reg]\n", + "\n", + "(\n", + " X_train_reg,\n", + " X_test_reg,\n", + " y_train_reg,\n", + " y_test_reg,\n", + " smiles_train_reg,\n", + " smiles_test_reg,\n", + ") = train_test_split(\n", + " X_feat_reg,\n", + " y_reg_clean,\n", + " smiles_reg,\n", + " test_size=0.3,\n", + " random_state=42,\n", + ")\n", + "\n", + "# --- Wrap regressor with CrossConformalCV ---\n", + "rf_reg = RandomForestRegressor(n_estimators=100, random_state=42)\n", + "rf_reg_pipeline = Pipeline(\n", + " [\n", + " (\"rf\", rf_reg),\n", + " ],\n", + " n_jobs=1,\n", + ")\n", + "\n", + "cc_reg = CrossConformalCV(\n", + " estimator=rf_reg_pipeline,\n", + " n_folds=5,\n", + " confidence_level=0.95,\n", + " estimator_type=\"regressor\",\n", + ")\n", + "cc_reg.fit(X_train_reg, y_train_reg)\n", + "\n", + "# --- Predict intervals and point predictions ---\n", + "intervals = np.array([m.predict_int(X_test_reg) for m in cc_reg.models_])\n", + "intervals_mean = intervals.mean(axis=0)\n", + "lower = intervals_mean[:, 0]\n", + "upper = intervals_mean[:, 1]\n", + "point_pred = np.mean([m.predict(X_test_reg) for m in cc_reg.models_], axis=0)\n", + "\n", + "df_cp_reg = pd.DataFrame(\n", + " {\n", + " \"pubchem_smiles\": smiles_test_reg,\n", + " \"pIC50\": y_test_reg,\n", + " \"pred_lower\": lower,\n", + " \"pred_upper\": upper,\n", + " \"point_pred\": point_pred,\n", + " }\n", + ")\n", + "display(df_cp_reg.head())\n", + "\n", + "# --- Regression: Evaluate coverage and interval width ---\n", + "coverage_reg = np.mean((y_test_reg >= lower) & (y_test_reg <= upper))\n", + "avg_width = np.mean(upper - lower)\n", + "mae = np.mean(np.abs(point_pred - y_test_reg))\n", + "\n", + "print(f\"Interval coverage: {coverage_reg:.3f}\")\n", + "print(f\"Average interval width: {avg_width:.3f}\")\n", + "print(f\"MAE (point prediction): {mae:.3f}\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.3" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/pyproject.toml b/pyproject.toml index cf63cc47..c24678f7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,7 @@ authors = [ description = "Integration of rdkit functionality into sklearn pipelines." readme = "README.md" dependencies = [ + "crepes>=0.8.0", "joblib>=1.3.0", "loguru>=0.7.3", "matplotlib>=3.10.1", diff --git a/tests/test_experimental/test_uncertainty/__init__.py b/tests/test_experimental/test_uncertainty/__init__.py new file mode 100644 index 00000000..269df2fa --- /dev/null +++ b/tests/test_experimental/test_uncertainty/__init__.py @@ -0,0 +1 @@ +"""Unit tests for conformal prediction wrappers in molpipeline.experimental.uncertainty.conformal.""" diff --git a/tests/test_experimental/test_uncertainty/test_conformal.py b/tests/test_experimental/test_uncertainty/test_conformal.py new file mode 100644 index 00000000..a5fceb0c --- /dev/null +++ b/tests/test_experimental/test_uncertainty/test_conformal.py @@ -0,0 +1,531 @@ +"""Unit tests for conformal prediction wrappers.""" + +import unittest +from pathlib import Path +from typing import Any + +import numpy as np +import numpy.typing as npt +import pandas as pd +from crepes.extras import MondrianCategorizer, hinge, margin +from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor +from sklearn.model_selection import train_test_split + +from molpipeline.any2mol import SmilesToMol +from molpipeline.experimental.uncertainty.conformal import ( + ConformalPredictor, + CrossConformalPredictor, +) +from molpipeline.mol2any import MolToMorganFP + +# Test data directory +TEST_DATA_DIR = Path(__file__).parent.parent.parent / "test_data" + +# Constants for fingerprints +FP_RADIUS = 2 +FP_SIZE = 1024 + + +class TestConformalCV(unittest.TestCase): + """Unit tests for ConformalPredictor and CrossConformalPredictor wrappers.""" + + # Class attributes for test data + x_clf: npt.NDArray[Any] + y_clf: npt.NDArray[Any] + x_reg: npt.NDArray[Any] + y_reg: npt.NDArray[Any] + + @classmethod + def setUpClass(cls) -> None: # pylint: disable=too-many-locals + """Set up test data once for all tests. + + Raises + ------ + ValueError: If no valid data is found after processing. + + """ + # Load data + bbbp_df = pd.read_csv( + TEST_DATA_DIR / "molecule_net_bbbp.tsv.gz", + sep="\t", + compression="gzip", + ) + logd_df = pd.read_csv( + TEST_DATA_DIR / "molecule_net_logd.tsv.gz", + sep="\t", + compression="gzip", + ) + + # Set up pipeline stages separately to handle invalid molecules + smi2mol = SmilesToMol(n_jobs=1) + morgan = MolToMorganFP(radius=FP_RADIUS, n_bits=FP_SIZE, n_jobs=1) + + # Process classification data + bbbp_clean = bbbp_df.dropna(subset=["smiles", "p_np"]) + smiles_list = bbbp_clean["smiles"].tolist() + labels_list = bbbp_clean["p_np"].tolist() + + # Convert SMILES to molecules first, filter out invalid ones + molecules = smi2mol.fit_transform(smiles_list) + valid_clf_data = [] + + for mol, label in zip(molecules, labels_list, strict=False): + # Skip InvalidInstance objects + if mol is None or hasattr( + mol, + "_fields", + ): # InvalidInstance is a NamedTuple + continue + # Generate fingerprint for valid molecule + try: + fp = morgan.transform([mol]) + if fp is not None and hasattr(fp, "toarray"): + valid_clf_data.append((fp.toarray().flatten(), label)) + except (AttributeError, TypeError): + # Skip molecules that can't be processed + continue + + if not valid_clf_data: + raise ValueError("No valid classification data found") + + cls.x_clf, cls.y_clf = map(np.array, zip(*valid_clf_data, strict=False)) + + # Process regression data + logd_clean = logd_df.dropna(subset=["smiles", "exp"]) + smiles_list_reg = logd_clean["smiles"].tolist() + labels_list_reg = logd_clean["exp"].tolist() + + # Convert SMILES to molecules first, filter out invalid ones + molecules_reg = smi2mol.transform(smiles_list_reg) + valid_reg_data = [] + + for mol, label in zip(molecules_reg, labels_list_reg, strict=False): + # Skip InvalidInstance objects + if mol is None or hasattr( + mol, + "_fields", + ): # InvalidInstance is a NamedTuple + continue + # Generate fingerprint for valid molecule - ensure mol is valid + try: + fp = morgan.transform([mol])[0] # type: ignore[list-item] + if fp is not None and hasattr(fp, "toarray"): + valid_reg_data.append((fp.toarray().flatten(), label)) + except (AttributeError, TypeError): + # Skip molecules that can't be processed + continue + + if not valid_reg_data: + raise ValueError("No valid regression data found") + + cls.x_reg, cls.y_reg = map(np.array, zip(*valid_reg_data, strict=False)) + + def test_conformal_prediction_classifier(self) -> None: + """Test ConformalPredictor with a classifier.""" + x_train, x_calib, y_train, y_calib = train_test_split( + self.x_clf, + self.y_clf, + test_size=0.2, + random_state=42, + ) + clf = RandomForestClassifier(random_state=42, n_estimators=5) + cp = ConformalPredictor(clf, estimator_type="classifier") + cp.fit(x_train, y_train) + cp.calibrate(x_calib, y_calib) + preds = cp.predict(x_calib) + probs = cp.predict_proba(x_calib) + sets = cp.predict_conformal_set(x_calib) + p_values = cp.predict_p(x_calib) + + self.assertEqual(len(preds), len(y_calib)) + self.assertEqual(probs.shape[0], len(y_calib)) + self.assertEqual(len(sets), len(y_calib)) + self.assertEqual(len(p_values), len(y_calib)) + + def test_conformal_prediction_regressor(self) -> None: + """Test ConformalPredictor with a regressor.""" + x_train, x_calib, y_train, y_calib = train_test_split( + self.x_reg, + self.y_reg, + test_size=0.2, + random_state=42, + ) + reg = RandomForestRegressor(random_state=42, n_estimators=5) + cp = ConformalPredictor(reg, estimator_type="regressor") + cp.fit(x_train, y_train) + cp.calibrate(x_calib, y_calib) + intervals = cp.predict_int(x_calib) + + self.assertEqual(intervals.shape[0], len(y_calib)) + self.assertEqual(intervals.shape[1], 2) + + def test_confidence_level_effect_regression(self) -> None: + """Test that increasing confidence level increases interval width.""" + x_train, x_calib, y_train, y_calib = train_test_split( + self.x_reg, + self.y_reg, + test_size=0.2, + random_state=42, + ) + reg = RandomForestRegressor(random_state=42, n_estimators=5) + cp = ConformalPredictor(reg, estimator_type="regressor") + cp.fit(x_train, y_train) + cp.calibrate(x_calib, y_calib) + + # Test different confidence levels + intervals_90 = cp.predict_int(x_calib, confidence=0.90) + intervals_95 = cp.predict_int(x_calib, confidence=0.95) + intervals_99 = cp.predict_int(x_calib, confidence=0.99) + + # Calculate average interval widths + width_90 = float(np.mean(intervals_90[:, 1] - intervals_90[:, 0])) + width_95 = float(np.mean(intervals_95[:, 1] - intervals_95[:, 0])) + width_99 = float(np.mean(intervals_99[:, 1] - intervals_99[:, 0])) + + # Higher confidence should lead to wider intervals + self.assertLess(width_90, width_95) + self.assertLess(width_95, width_99) + + def test_confidence_level_effect_classification(self) -> None: + """Test that lower confidence level increases prediction set size.""" + x_train, x_calib, y_train, y_calib = train_test_split( + self.x_clf, + self.y_clf, + test_size=0.2, + random_state=42, + ) + clf = RandomForestClassifier(random_state=42, n_estimators=5) + cp = ConformalPredictor(clf, estimator_type="classifier") + cp.fit(x_train, y_train) + cp.calibrate(x_calib, y_calib) + + # Test different confidence levels + sets_90 = cp.predict_conformal_set(x_calib, confidence=0.90) + sets_95 = cp.predict_conformal_set(x_calib, confidence=0.95) + sets_99 = cp.predict_conformal_set(x_calib, confidence=0.99) + + # Calculate average prediction set sizes + size_90 = float(np.mean([len(s) for s in sets_90])) + size_95 = float(np.mean([len(s) for s in sets_95])) + size_99 = float(np.mean([len(s) for s in sets_99])) + # Higher confidence should lead to larger prediction sets + self.assertLessEqual(size_90, size_95) + self.assertLessEqual(size_95, size_99) + + def test_cross_conformal_classifier(self) -> None: + """Test CrossConformalPredictor with a classifier.""" + clf = RandomForestClassifier(random_state=42, n_estimators=5) + ccp = CrossConformalPredictor(clf, estimator_type="classifier", n_folds=3) + ccp.fit(self.x_clf, self.y_clf) + preds = ccp.predict(self.x_clf) + probs = ccp.predict_proba(self.x_clf) + sets = ccp.predict_conformal_set(self.x_clf) + p_values = ccp.predict_p(self.x_clf) + + self.assertEqual(len(preds), len(self.y_clf)) + self.assertEqual(probs.shape[0], len(self.y_clf)) + self.assertEqual(len(sets), len(self.y_clf)) + self.assertEqual(len(p_values), len(self.y_clf)) + + def test_cross_conformal_regressor(self) -> None: + """Test CrossConformalPredictor with a regressor.""" + reg = RandomForestRegressor(random_state=42, n_estimators=5) + ccp = CrossConformalPredictor(reg, estimator_type="regressor", n_folds=3) + ccp.fit(self.x_reg, self.y_reg) + intervals = ccp.predict_int(self.x_reg) + + # Each model should produce intervals for all samples + for model in ccp.models_: + model_intervals = model.predict_int(self.x_reg) + self.assertEqual(model_intervals.shape[0], len(self.y_reg)) + self.assertEqual(model_intervals.shape[1], 2) + + # Aggregated intervals should have correct shape + self.assertEqual(intervals.shape[0], len(self.y_reg)) + self.assertEqual(intervals.shape[1], 2) + + def test_cross_conformal_confidence_effect_regression(self) -> None: + """Test confidence level effect in cross-conformal regression.""" + reg = RandomForestRegressor(random_state=42, n_estimators=5) + ccp = CrossConformalPredictor(reg, estimator_type="regressor", n_folds=3) + ccp.fit(self.x_reg, self.y_reg) + + # Test different confidence levels + intervals_90 = ccp.predict_int(self.x_reg, confidence=0.90) + intervals_95 = ccp.predict_int(self.x_reg, confidence=0.95) + intervals_99 = ccp.predict_int(self.x_reg, confidence=0.99) + + # Calculate average interval widths + width_90 = float(np.mean(intervals_90[:, 1] - intervals_90[:, 0])) + width_95 = float(np.mean(intervals_95[:, 1] - intervals_95[:, 0])) + width_99 = float(np.mean(intervals_99[:, 1] - intervals_99[:, 0])) + + # Higher confidence should lead to wider intervals + self.assertLess(width_90, width_95) + self.assertLess(width_95, width_99) + + def test_cross_conformal_confidence_effect_classification(self) -> None: + """Test confidence level effect in cross-conformal classification.""" + clf = RandomForestClassifier(random_state=42, n_estimators=5) + ccp = CrossConformalPredictor(clf, estimator_type="classifier", n_folds=3) + ccp.fit(self.x_clf, self.y_clf) + + # Test different confidence levels + sets_90 = ccp.predict_conformal_set(self.x_clf, confidence=0.90) + sets_95 = ccp.predict_conformal_set(self.x_clf, confidence=0.95) + sets_99 = ccp.predict_conformal_set(self.x_clf, confidence=0.99) + + # Calculate average prediction set sizes + size_90 = float(np.mean([len(s) for s in sets_90])) + size_95 = float(np.mean([len(s) for s in sets_95])) + size_99 = float(np.mean([len(s) for s in sets_99])) + + # Higher confidence should lead to larger prediction sets + self.assertLessEqual(size_90, size_95) + self.assertLessEqual(size_95, size_99) + + def test_auto_detection(self) -> None: + """Test automatic estimator type detection.""" + # Test classifier auto-detection + clf = RandomForestClassifier(random_state=42) + cp_clf = ConformalPredictor(clf, estimator_type="auto") + self.assertEqual(cp_clf.estimator_type, "classifier") + + # Test regressor auto-detection + reg = RandomForestRegressor(random_state=42) + cp_reg = ConformalPredictor(reg, estimator_type="auto") + self.assertEqual(cp_reg.estimator_type, "regressor") + + def test_nonconformity_functions(self) -> None: + """Test nonconformity functions for classification.""" + x_train, x_calib, y_train, y_calib = train_test_split( + self.x_clf, + self.y_clf, + test_size=0.2, + random_state=42, + ) + + clf = RandomForestClassifier(random_state=42, n_estimators=5) + + # Test with hinge nonconformity + cp_hinge = ConformalPredictor( + clf, + estimator_type="classifier", + nonconformity=hinge, + ) + cp_hinge.fit(x_train, y_train) + cp_hinge.calibrate(x_calib, y_calib) + sets_hinge = cp_hinge.predict_conformal_set(x_calib) + p_values_hinge = cp_hinge.predict_p(x_calib) + + # Test with margin nonconformity + cp_margin = ConformalPredictor( + clf, + estimator_type="classifier", + nonconformity=margin, + ) + cp_margin.fit(x_train, y_train) + cp_margin.calibrate(x_calib, y_calib) + sets_margin = cp_margin.predict_conformal_set(x_calib) + p_values_margin = cp_margin.predict_p(x_calib) + + # Verify outputs have correct shapes + self.assertEqual(len(sets_hinge), len(y_calib)) + self.assertEqual(len(sets_margin), len(y_calib)) + self.assertEqual(len(p_values_hinge), len(y_calib)) + self.assertEqual(len(p_values_margin), len(y_calib)) + + # Different nonconformity functions should give different results + self.assertNotEqual(sets_hinge, sets_margin) + + def test_mondrian_conformal_classification(self) -> None: + """Test Mondrian conformal prediction for classification.""" + x_train, x_calib, y_train, y_calib = train_test_split( + self.x_clf, + self.y_clf, + test_size=0.2, + random_state=42, + ) + + clf = RandomForestClassifier(random_state=42, n_estimators=5) + + # Test with custom MondrianCategorizer (skip mondrian=True for now) + mc = MondrianCategorizer() + # Simple categorizer based on first feature + mc.fit( + x_calib, + f=lambda x: (x[:, 0] > np.median(x[:, 0])).astype(int), + no_bins=2, + ) + + cp_mondrian_custom = ConformalPredictor( + clf, + estimator_type="classifier", + mondrian=mc, + ) + cp_mondrian_custom.fit(x_train, y_train) + cp_mondrian_custom.calibrate(x_calib, y_calib) + sets_custom = cp_mondrian_custom.predict_conformal_set(x_calib) + p_values_custom = cp_mondrian_custom.predict_p(x_calib) + + # Test without Mondrian (baseline) + cp_baseline = ConformalPredictor( + clf, + estimator_type="classifier", + mondrian=False, + ) + cp_baseline.fit(x_train, y_train) + cp_baseline.calibrate(x_calib, y_calib) + sets_baseline = cp_baseline.predict_conformal_set(x_calib) + + # Verify outputs have correct shapes + self.assertEqual(len(sets_custom), len(sets_baseline)) + self.assertEqual(len(p_values_custom), len(y_calib)) + + # Verify that prediction sets contain valid class indices + for pred_set in sets_custom: + self.assertIsInstance(pred_set, list) + for class_idx in pred_set: + self.assertIsInstance(class_idx, (int, np.integer)) + self.assertGreaterEqual(class_idx, 0) + + self.assertTrue(np.all(p_values_custom >= 0)) + self.assertTrue(np.all(p_values_custom <= 1)) + + def test_mondrian_conformal_regression(self) -> None: + """Test Mondrian conformal prediction for regression.""" + x_train, x_calib, y_train, y_calib = train_test_split( + self.x_reg, + self.y_reg, + test_size=0.2, + random_state=42, + ) + + reg = RandomForestRegressor(random_state=42, n_estimators=5) + + # Test with custom MondrianCategorizer for regression + mc = MondrianCategorizer() + # Categorize based on median of first feature + mc.fit( + x_calib, + f=lambda x: (x[:, 0] > np.median(x[:, 0])).astype(int), + no_bins=2, + ) + + cp_mondrian = ConformalPredictor(reg, estimator_type="regressor", mondrian=mc) + cp_mondrian.fit(x_train, y_train) + cp_mondrian.calibrate(x_calib, y_calib) + intervals_mondrian = cp_mondrian.predict_int(x_calib) + + # Test without Mondrian (baseline) + cp_baseline = ConformalPredictor( + reg, + estimator_type="regressor", + mondrian=False, + ) + cp_baseline.fit(x_train, y_train) + cp_baseline.calibrate(x_calib, y_calib) + intervals_baseline = cp_baseline.predict_int(x_calib) + + # Verify outputs have correct shapes + self.assertEqual(intervals_mondrian.shape, (len(y_calib), 2)) + self.assertEqual(intervals_baseline.shape, (len(y_calib), 2)) + + # Mondrian should give different results than baseline + self.assertFalse(np.array_equal(intervals_mondrian, intervals_baseline)) + + def test_cross_conformal_mondrian_both_classes(self) -> None: + """Test Mondrian with CrossConformalPredictors.""" + # Test classification with custom MondrianCategorizer + clf = RandomForestClassifier(random_state=42, n_estimators=5) + + # Create a simple Mondrian categorizer for classification + mc_clf = MondrianCategorizer() + mc_clf.fit( + self.x_clf, + f=lambda x: (x[:, 0] > np.median(x[:, 0])).astype(int), + no_bins=2, + ) + + ccp_clf = CrossConformalPredictor( + clf, + estimator_type="classifier", + n_folds=3, + mondrian=mc_clf, + random_state=42, + ) + ccp_clf.fit(self.x_clf, self.y_clf) + sets_mondrian = ccp_clf.predict_conformal_set(self.x_clf[:10]) + p_values_mondrian = ccp_clf.predict_p(self.x_clf[:10]) + + # Test without Mondrian for comparison + ccp_clf_baseline = CrossConformalPredictor( + clf, + estimator_type="classifier", + n_folds=3, + mondrian=False, + random_state=42, + ) + ccp_clf_baseline.fit(self.x_clf, self.y_clf) + sets_baseline = ccp_clf_baseline.predict_conformal_set(self.x_clf[:10]) + + # Verify shapes + self.assertEqual(len(sets_mondrian), len(sets_baseline)) + self.assertEqual(len(p_values_mondrian), 10) + + # Test regression with binning (Mondrian-style for regression) + reg = RandomForestRegressor(random_state=42, n_estimators=5) + ccp_reg = CrossConformalPredictor( + reg, + estimator_type="regressor", + n_folds=3, + binning=3, + random_state=42, + ) + ccp_reg.fit(self.x_reg, self.y_reg) + intervals_binned = ccp_reg.predict_int(self.x_reg[:10]) + + # Test without binning for comparison + ccp_reg_baseline = CrossConformalPredictor( + reg, + estimator_type="regressor", + n_folds=3, + binning=None, + random_state=42, + ) + ccp_reg_baseline.fit(self.x_reg, self.y_reg) + intervals_baseline_reg = ccp_reg_baseline.predict_int(self.x_reg[:10]) + + # Verify shapes + self.assertEqual(intervals_binned.shape, (10, 2)) + self.assertEqual(intervals_baseline_reg.shape, (10, 2)) + + def test_error_handling(self) -> None: + """Test error handling for various invalid operations.""" + clf = RandomForestClassifier(random_state=42, n_estimators=5) + cp = ConformalPredictor(clf, estimator_type="classifier") + + # Test prediction before fitting + with self.assertRaises(ValueError): + cp.predict(self.x_clf[:5]) + + # Test calibration before fitting + with self.assertRaises(RuntimeError): + cp.calibrate(self.x_clf[:10], self.y_clf[:10]) + + # Test predict_proba on regressor + reg = RandomForestRegressor(random_state=42, n_estimators=5) + cp_reg = ConformalPredictor(reg, estimator_type="regressor") + cp_reg.fit(self.x_reg[:50], self.y_reg[:50]) + + with self.assertRaises(NotImplementedError): + cp_reg.predict_proba(self.x_reg[:5]) + + # Test predict_int on classifier + cp.fit(self.x_clf[:50], self.y_clf[:50]) + with self.assertRaises(NotImplementedError): + cp.predict_int(self.x_clf[:5]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 84eb6ae4..9b7510d5 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -1,3 +1,5 @@ +# pylint: disable=too-many-locals, import-outside-toplevel, invalid-name + """Test functionality of the pipeline class.""" from __future__ import annotations @@ -54,7 +56,7 @@ def test_fit_transform_single_core(self) -> None: [ ("smi2mol", smi2mol), ("morgan", mol2morgan), - ] + ], ) # Run pipeline @@ -73,11 +75,11 @@ def test_sklearn_pipeline(self) -> None: ("smi2mol", smi2mol), ("morgan", mol2morgan), ("decision_tree", d_tree), - ] + ], ) s_pipeline.fit(TEST_SMILES, CONTAINS_OX) predicted_value_array = s_pipeline.predict(TEST_SMILES) - for pred_val, true_val in zip(predicted_value_array, CONTAINS_OX): + for pred_val, true_val in zip(predicted_value_array, CONTAINS_OX, strict=False): self.assertEqual(pred_val, true_val) def test_sklearn_pipeline_parallel(self) -> None: @@ -96,7 +98,7 @@ def test_sklearn_pipeline_parallel(self) -> None: s_pipeline.fit(TEST_SMILES, CONTAINS_OX) out = s_pipeline.predict(TEST_SMILES) self.assertEqual(len(out), len(CONTAINS_OX)) - for pred_val, true_val in zip(out, CONTAINS_OX): + for pred_val, true_val in zip(out, CONTAINS_OX, strict=False): self.assertEqual(pred_val, true_val) def test_salt_removal(self) -> None: @@ -119,16 +121,21 @@ def test_salt_removal(self) -> None: ("empty_mol_filter", empty_mol_filter), ("remove_charge", remove_charge), ("mol2smi", mol2smi), - ] + ], ) - generated_smiles = salt_remover_pipeline.transform(smiles_with_salt_list) - for generated_smiles, smiles_without_salt in zip( - generated_smiles, smiles_without_salt_list + generated_smiles_list = salt_remover_pipeline.transform(smiles_with_salt_list) + for generated_smi, smiles_without_salt in zip( + generated_smiles_list, + smiles_without_salt_list, + strict=False, ): - self.assertEqual(generated_smiles, smiles_without_salt) + self.assertEqual(generated_smi, smiles_without_salt) def test_json_generation(self) -> None: - """Test that the json representation of a pipeline can be loaded back into a pipeline.""" + """Test that the json representation of a pipeline can be loaded back. + + This test verifies that a pipeline can be loaded back into a pipeline. + """ # Create pipeline smi2mol = SmilesToMol() metal_disconnector = MetalDisconnector() @@ -146,7 +153,7 @@ def test_json_generation(self) -> None: ("metal_disconnector", metal_disconnector), ("salt_remover", salt_remover), ("physchem", physchem), - ] + ], ) # Convert pipeline to json @@ -156,7 +163,9 @@ def test_json_generation(self) -> None: self.assertTrue(isinstance(loaded_pipeline, Pipeline)) # Compare pipeline elements for loaded_element, original_element in zip( - loaded_pipeline.steps, pipeline_element_list + loaded_pipeline.steps, + pipeline_element_list, + strict=False, ): if loaded_element[1] == "passthrough": self.assertEqual(loaded_element[1], original_element) @@ -176,7 +185,7 @@ def test_fit_transform_record_remove_nones(self) -> None: mol2morgan = MolToMorganFP(radius=FP_RADIUS, n_bits=FP_SIZE) empty_mol_filter = EmptyMoleculeFilter() remove_none = ErrorFilter.from_element_list( - [smi2mol, salt_remover, mol2morgan, empty_mol_filter] + [smi2mol, salt_remover, mol2morgan, empty_mol_filter], ) # Create pipeline pipeline = Pipeline( @@ -191,13 +200,16 @@ def test_fit_transform_record_remove_nones(self) -> None: # Run pipeline matrix = pipeline.fit_transform(TEST_SMILES + FAULTY_TEST_SMILES) - # Compare with expected output (Which is the same as the output without the faulty smiles) + # Compare with expected output + # (Which is the same as the output without the faulty smiles) self.assertTrue(are_equal(EXPECTED_OUTPUT, matrix)) def test_caching(self) -> None: - """Test if the caching gives the same results and is faster on the second run.""" + """Test if the caching gives the same results & is faster on the second run.""" molecule_net_logd_df = pd.read_csv( - TEST_DATA_DIR / "molecule_net_logd.tsv.gz", sep="\t", nrows=20 + TEST_DATA_DIR / "molecule_net_logd.tsv.gz", + sep="\t", + nrows=20, ) prediction_list = [] for cache_activated in [False, True]: @@ -235,7 +247,8 @@ def test_caching(self) -> None: n_transformations = pipeline.named_steps["mol2concat"].n_transformations if cache_activated: - # Fit is called twice, but the transform is only called once, since the second run is cached + # Fit is called twice, but the transform is only called once, + # since the second run is cached self.assertEqual(n_transformations, 1) else: self.assertEqual(n_transformations, 2) @@ -263,7 +276,7 @@ def test_gridsearchcv(self) -> None: "physchem__descriptor_list": [ ["HeavyAtomMolWt"], ["HeavyAtomMolWt", "HeavyAtomCount"], - ] + ], }, }, ] @@ -273,7 +286,8 @@ def test_gridsearchcv(self) -> None: element = test_data_dict["element"] param_grid = test_data_dict["param_grid"] - # set up a pipeline that trains a random forest classifier on morgan fingerprints + # set up a pipeline that trains + # a random forest classifier on morgan fingerprints pipeline = Pipeline( [ ("auto2mol", AutoToMol()), @@ -307,13 +321,15 @@ def test_gridsearchcv(self) -> None: self.assertIn(grid_search_cv.best_params_[k], value) def test_gridsearch_cache(self) -> None: - """Run a short GridSearchCV and check if the caching and not caching gives the same results.""" + """Run GridSearchCV and check caching vs not caching gives same results.""" h_params = { "rf__n_estimators": [1, 2], } # First without caching data_df = pd.read_csv( - TEST_DATA_DIR / "molecule_net_logd.tsv.gz", sep="\t", nrows=20 + TEST_DATA_DIR / "molecule_net_logd.tsv.gz", + sep="\t", + nrows=20, ) best_param_dict = {} prediction_dict = {} @@ -339,7 +355,7 @@ def test_gridsearch_cache(self) -> None: grid_search_cv.fit(data_df["smiles"].tolist(), data_df["exp"].tolist()) best_param_dict[cache_activated] = grid_search_cv.best_params_ prediction_dict[cache_activated] = grid_search_cv.predict( - data_df["smiles"].tolist() + data_df["smiles"].tolist(), ) mem.clear(warn=False) self.assertEqual(best_param_dict[True], best_param_dict[False]) @@ -360,13 +376,16 @@ def test_calibrated_classifier(self) -> None: ( "error_replacer", PostPredictionWrapper( - FilterReinserter.from_error_filter(error_filter, np.nan) + FilterReinserter.from_error_filter(error_filter, np.nan), ), ), - ] + ], ) calibrated_pipeline = CalibratedClassifierCV( - s_pipeline, cv=2, ensemble=True, method="isotonic" + s_pipeline, + cv=2, + ensemble=True, + method="isotonic", ) calibrated_pipeline.fit(TEST_SMILES, CONTAINS_OX) predicted_value_array = calibrated_pipeline.predict(TEST_SMILES)