diff --git a/src/poli_baselines/core/utils/bo_pr/get_problem.py b/src/poli_baselines/core/utils/bo_pr/get_problem.py new file mode 100644 index 0000000..c83e9f4 --- /dev/null +++ b/src/poli_baselines/core/utils/bo_pr/get_problem.py @@ -0,0 +1,51 @@ +import torch + +from botorch.utils.multi_objective import infer_reference_point + +from discrete_mixed_bo.problems.base import DiscreteTestProblem + +from .poli_objective_in_pr import ( + PoliObjective, + PoliMultiObjective, + PoliDiscreteObjective, +) + + +def get_problem(name: str, **kwargs) -> DiscreteTestProblem: + r"""Initialize the test function.""" + if name == "poli": + # test dimensionality if solvable: + dim = len(kwargs.get("alphabet", None)) * kwargs.get("sequence_length", None) + # objective = PoliObjective if dim < 1000 else PoliDiscreteObjective + return PoliDiscreteObjective( + black_box=kwargs["black_box"], + alphabet=kwargs.get("alphabet", None), + sequence_length=kwargs.get("sequence_length", None), + integer_indices=list(range(kwargs.get("sequence_length", None))), + negate=kwargs.get("negate", False), + tokenizer=kwargs.get("tokenizer"), + # categorical_indices=list(range(kwargs.get("sequence_length", None))), + ) + elif name == "poli_moo": + alphabet = kwargs.get("alphabet", None) + s_len = kwargs.get("sequence_length", None) + if s_len is None: + raise RuntimeError("Sequence Length None!") + integer_bounds = torch.zeros(2, s_len) + integer_bounds[1, :] = len(alphabet) + problem = PoliMultiObjective( + black_box=kwargs["black_box"], + alphabet=alphabet, + sequence_length=kwargs.get("sequence_length", None), + negate=kwargs.get("negate", False), + ref_point=infer_reference_point( + torch.from_numpy(kwargs.get("y0", None)) + ), # NOTE from infer_reference_point: this assumes maximization of all objectives. + integer_indices=list(range(s_len)), + integer_bounds=integer_bounds, + x0=kwargs.get("x0", None), + ) + return problem + + else: + raise ValueError(f"Unknown function name: {name}!") diff --git a/src/poli_baselines/core/utils/bo_pr/poli_objective_in_pr.py b/src/poli_baselines/core/utils/bo_pr/poli_objective_in_pr.py index 1f4babb..85bcd36 100644 --- a/src/poli_baselines/core/utils/bo_pr/poli_objective_in_pr.py +++ b/src/poli_baselines/core/utils/bo_pr/poli_objective_in_pr.py @@ -9,14 +9,18 @@ """ from __future__ import annotations - +from typing import List, Optional import numpy as np import torch +from botorch.utils.torch import BufferDict from poli.core.abstract_black_box import AbstractBlackBox -from discrete_mixed_bo.problems.base import DiscreteTestProblem +from discrete_mixed_bo.problems.base import ( + DiscreteTestProblem, + MultiObjectiveTestProblem, +) class PoliObjective(DiscreteTestProblem): @@ -54,3 +58,151 @@ def evaluate_true(self, X: torch.Tensor): # 2. evaluate the black box return torch.from_numpy(self.black_box(np.array(x_str))) + + +class PoliDiscreteObjective(DiscreteTestProblem): + """ + A bridge between poli black boxes and PR. Strictly discrete single objective - no one-hot. + """ + + _discrete_values: dict + _bounds: list + + def __init__( + self, + black_box: AbstractBlackBox, + sequence_length: int, + alphabet: list[str] | None = None, + noise_std: float | None = None, + negate: bool = False, + integer_indices: Optional[List[int]] = None, + categorical_indices: Optional[List[int]] = None, + tokenizer: object = None, + ) -> None: + self.dim = sequence_length + self.black_box = black_box + self.tokenizer = tokenizer + alphabet = alphabet or self.black_box.info.alphabet + if alphabet is None: + raise ValueError("Alphabet must be provided.") + # if integer_indices is None: + # integer_indices = [i for i in range(sequence_length)] + + self._bounds = [(0, len(alphabet) - 1) for _ in range(sequence_length)] + self.alphabet_s_to_i = {s: i for i, s in enumerate(alphabet)} + self.alphabet_i_to_s = {i: s for i, s in enumerate(alphabet)} + super().__init__( + noise_std, negate, categorical_indices=list(range(sequence_length)) + ) + self._setup( + integer_indices=integer_indices, categorical_indices=categorical_indices + ) + self.discrete_values = BufferDict() + self._discrete_values = { + f"pos_{i}": list(self.alphabet_s_to_i.values()) + for i in range(sequence_length) + } + for v in self._discrete_values.values(): + self._bounds.append((0, len(alphabet))) + + def evaluate_true(self, X: torch.Tensor): + # Evaluate true seems to be expecting + # a tensor of integers. + if X.ndim == 1: + X = X.unsqueeze(0) + + # 1. transform to a list of strings + x_str = [ + [self.alphabet_i_to_s[int(i)] for i in x_i] for x_i in X.numpy(force=True) + ] + + # 2. evaluate the black box + return torch.from_numpy(self.black_box(np.array(x_str))) + + +class PoliMultiObjective(DiscreteTestProblem, MultiObjectiveTestProblem): + """ + A bridge between poli black boxes and PR. + """ + + num_objectives: int + _ref_point: List[float] + _discrete_values: dict + _bounds: list + + def __init__( + self, + black_box: AbstractBlackBox, + x0: np.ndarray, + sequence_length: int, + alphabet: List[str] = None, + noise_std: float = None, + negate: bool = False, + integer_indices=None, + integer_bounds=None, + ref_point: List[float] = None, + preserve_len: bool = True, + ) -> None: + self._bounds = [(0, len(alphabet) - 1) for _ in range(sequence_length)] + if "" == alphabet[0]: + self._bounds = [ + (1, len(alphabet) - 1) for _ in range(sequence_length) + ] # eliminate pad symbol from sampling + self.dim = sequence_length + self.black_box = black_box + alphabet = alphabet or self.black_box.info.alphabet + self._ref_point = ( + ref_point # NOTE: this assumes maximization of all objectives. + ) + self.num_objectives = ref_point.shape[-1] + self.sequence_length = sequence_length + self.Ls = [len(x[x != ""]) for x in x0] + self.preserve_len = preserve_len + if alphabet is None: + raise ValueError("Alphabet must be provided.") + + self.alphabet_s_to_i = {s: i for i, s in enumerate(alphabet)} + self.alphabet_i_to_s = {i: s for i, s in enumerate(alphabet)} + MultiObjectiveTestProblem.__init__( + self, + noise_std=noise_std, + negate=negate, + ) + self._setup(integer_indices=integer_indices) + self.discrete_values = BufferDict() + self._discrete_values = { + f"pos_{i}": list(self.alphabet_s_to_i.values()) + for i in range(sequence_length) + } + for v in self._discrete_values.values(): + self._bounds.append((0, len(alphabet))) + + def _consistent_length(self, x: List[str]): + valid_x = [] + for _x in x: + cand_len = len(_x[_x != ""]) + if cand_len not in self.Ls: + closest_len = min( + self.Ls, key=lambda x: abs(x - cand_len) + ) # clip to closest length + valid_x.append( + list(_x[:closest_len]) + [""] * (self.sequence_length - closest_len) + ) + else: + valid_x.append(_x) + return np.vstack(valid_x) + + def evaluate_true(self, X: torch.Tensor): + # Evaluate true seems to be expecting + # a tensor of integers. + if X.ndim == 1: + X = X.unsqueeze(0) + + # 1. transform to a list of strings + x_str = [ + [self.alphabet_i_to_s[int(i)] for i in x_i] for x_i in X.numpy(force=True) + ] + if self.preserve_len: + x_str = self._consistent_length(x_str) + # 2. evaluate the black box + return torch.from_numpy(self.black_box(np.array(x_str))) diff --git a/src/poli_baselines/core/utils/bo_pr/run_one_replication.py b/src/poli_baselines/core/utils/bo_pr/run_one_replication.py index 60d738f..edb33c3 100644 --- a/src/poli_baselines/core/utils/bo_pr/run_one_replication.py +++ b/src/poli_baselines/core/utils/bo_pr/run_one_replication.py @@ -37,7 +37,6 @@ generate_initial_data, get_acqf, get_exact_rounding_func, - get_problem, initialize_model, ) from discrete_mixed_bo.input import OneHotToNumeric @@ -48,7 +47,7 @@ ) from discrete_mixed_bo.trust_region import TurboState, update_state -from .poli_objective_in_pr import PoliObjective +from poli_baselines.core.utils.bo_pr.get_problem import get_problem supported_labels = [ "sobol", @@ -82,16 +81,16 @@ """ We modify this implementation slightly -by introducing poli black boxes instead of -function names. +by restricting get problems to our poli-defined +ones in .get_problem """ -def run_one_replication_on_poli_black_box( +def run_one_replication( seed: int, label: str, iterations: int, - black_box: AbstractBlackBox, + function_name: str, batch_size: int, mc_samples: int, n_initial_points: Optional[int] = None, @@ -140,7 +139,7 @@ def run_one_replication_on_poli_black_box( optimization_kwargs = optimization_kwargs or {} # TODO: use model list when there are constraints # or multiple objectives - base_function = PoliObjective(black_box, **problem_kwargs) + base_function = get_problem(name=function_name, **problem_kwargs) base_function.to(**tkwargs) binary_dims = base_function.integer_indices binary_mask = base_function.integer_bounds[1] - base_function.integer_bounds[0] == 1 diff --git a/src/poli_baselines/solvers/bayesian_optimization/pr/solver.py b/src/poli_baselines/solvers/bayesian_optimization/pr/solver.py index 47d21da..3d3539a 100644 --- a/src/poli_baselines/solvers/bayesian_optimization/pr/solver.py +++ b/src/poli_baselines/solvers/bayesian_optimization/pr/solver.py @@ -3,6 +3,7 @@ """ from __future__ import annotations +import logging from typing import Literal @@ -11,10 +12,11 @@ from poli.core.abstract_black_box import AbstractBlackBox +from poli.core.multi_objective_black_box import MultiObjectiveBlackBox from poli_baselines.core.abstract_solver import AbstractSolver from poli_baselines.core.utils.bo_pr.run_one_replication import ( - run_one_replication_on_poli_black_box, + run_one_replication, ) @@ -39,6 +41,7 @@ def __init__( alphabet: list[str] | None = None, noise_std: float | None = None, use_fixed_noise: bool = False, + tokenizer: object = None, label: Literal[ "sobol", "cont_optim__round_after__ei", @@ -85,9 +88,16 @@ def __init__( raise ValueError( f"For this specific black box ({self.black_box.info.name}), an alphabet must be provided." ) + self.add_padding_element = any(["" in x for x in x0]) self.alphabet = alphabet_ + if self.add_padding_element: + logging.warn( + "PADDING ADDED! Element found in x0 and added to alphabet\n THIS MAY BE UNDESIRED BEHAVIOR" + ) + self.alphabet = [""] + alphabet self.alphabet_s_to_i = {s: i for i, s in enumerate(self.alphabet)} self.alphabet_i_to_s = {i: s for i, s in enumerate(self.alphabet)} + self.tokenizer = tokenizer if isinstance(x0, np.ndarray): # Checking that it's of the form [_, L], where @@ -114,32 +124,62 @@ def solve( ): if self.x0 is not None: # We need to transform it to a tensor of integers. - X_init_ = [[self.alphabet_s_to_i[s] for s in x_i] for x_i in self.x0] + if self.tokenizer is not None: # tokenize if one provided + X_init_ = [ + [ + self.alphabet_s_to_i[s] + for s in [s for s in self.tokenizer("".join(x_i)) if s] + ] + for x_i in self.x0 + ] + else: + X_init_ = [[self.alphabet_s_to_i[s] for s in x_i] for x_i in self.x0] + if not all( + len(x) == len(X_init_[0]) for x in X_init_ + ): # unequal length due to pad skip + max_len = max([len(x) for x in X_init_]) + X_init_ = np.vstack( + [ + list(x) + [self.alphabet_s_to_i[""]] * int(max_len - len(x)) + for x in X_init_ + ] + ) X_init = torch.Tensor(X_init_).long() - X_init = torch.nn.functional.one_hot(X_init, len(self.alphabet)).flatten( - start_dim=1 - ) + # X_init = torch.nn.functional.one_hot(X_init, len(self.alphabet)).flatten( + # start_dim=1 + # ) else: X_init = None if self.y0 is None: Y_init = None + is_moo = None else: Y_init = torch.from_numpy(self.y0) + is_moo = Y_init.shape[1] > 1 + + if is_moo or isinstance(self.black_box, MultiObjectiveBlackBox): + function_name = "poli_moo" + else: + function_name = "poli" - run_one_replication_on_poli_black_box( + run_one_replication( seed=self.seed, label=self.label, iterations=max_iter, - black_box=self.black_box, + function_name=function_name, batch_size=self.batch_size, mc_samples=self.mc_samples, n_initial_points=self.n_initial_points, problem_kwargs={ + "black_box": self.black_box, "sequence_length": self.sequence_length, "alphabet": self.alphabet, "negate": False, "noise_std": self.noise_std, + "y0": self.y0, + "x0": self.x0, + "tokenizer": self.tokenizer, }, model_kwargs={ "use_fixed_noise": self.use_fixed_noise,