Source code for smt.applications.mixed_integer

"""
Author: Remi Lafage <remi.lafage@onera.fr>

This package is distributed under New BSD license.
"""

import warnings

import numpy as np

from smt.sampling_methods.sampling_method import SamplingMethod
from smt.surrogate_models.krg_based import KrgBased, MixIntKernelType
from smt.surrogate_models.surrogate_model import SurrogateModel
from smt.utils.checks import ensure_2d_array

from smt.design_space import (
    BaseDesignSpace,
    CategoricalVariable,
    ensure_design_space,
)


class MixedIntegerSamplingMethod(SamplingMethod):
    """
    Sampling method decorator that takes an SMT continuous sampling method and
    cast values according x types specification to implement a sampling method
    handling integer (ORD) or categorical (ENUM) features
    """

    def __init__(self, sampling_method_class, design_space, **kwargs):
        """
        Parameters
        ----------
        sampling_method_class: class name
            SMT sampling method class
        design_space: BaseDesignSpace
            design space definition
        kwargs: options of the given sampling method
            options used to instanciate the SMT sampling method
            with the additional 'output_in_folded_space' boolean option
            specifying if doe output should be in folded space (enum indexes)
            or not (enum masks)
        """
        self._design_space = design_space
        if "random_state" in kwargs:
            self._design_space.random_state = kwargs["random_state"]
        elif self._design_space.random_state is None:
            self._design_space.random_state = 42
        self._unfolded_xlimits = design_space.get_unfolded_num_bounds()
        self._output_in_folded_space = kwargs.get("output_in_folded_space", True)
        kwargs.pop("output_in_folded_space", None)
        self._sampling_method = sampling_method_class(
            xlimits=self._unfolded_xlimits, **kwargs
        )
        super().__init__(xlimits=self._unfolded_xlimits)

    def _compute(self, nt, return_is_acting=False):
        x_doe, is_acting = self._design_space.sample_valid_x(
            nt,
            unfolded=not self._output_in_folded_space,
            random_state=self._design_space.random_state,
        )
        if return_is_acting:
            return x_doe, is_acting
        else:
            return x_doe

    def __call__(self, nt, return_is_acting=False):
        return self._compute(nt, return_is_acting)

    def expand_lhs(self, x, nt, method="basic"):
        doe = self._sampling_method(nt)
        x_doe, _ = self._design_space.correct_get_acting(doe)
        if self._output_in_folded_space:
            x_doe, _ = self._design_space.fold_x(x_doe)
        return x_doe


class MixedIntegerSurrogateModel(SurrogateModel):
    """
    Surrogate model (not Kriging) decorator that takes an SMT continuous surrogate model and
    cast values according x types specification to implement a surrogate model
    handling integer (ORD) or categorical (ENUM) features
    """

    def __init__(
        self,
        design_space,
        surrogate,
        input_in_folded_space=True,
    ):
        """
        Parameters
        ----------
        design_space: BaseDesignSpace
            design space definition
        surrogate: SMT surrogate model (not Kriging)
            instance of a SMT surrogate model
        input_in_folded_space: bool
            whether x data are in given in folded space (enum indexes) or not (enum masks)
        categorical_kernel: string
            the kernel to use for categorical inputs. Only for non continuous Kriging.
        """
        super().__init__()
        self._surrogate = surrogate
        if isinstance(self._surrogate, KrgBased):
            raise ValueError(
                "Using MixedIntegerSurrogateModel integer model with "
                + str(self._surrogate.name)
                + " is not supported. Please use MixedIntegerKrigingModel instead."
            )
        self.design_space = ensure_design_space(design_space=design_space)

        self._input_in_folded_space = input_in_folded_space
        self.supports = self._surrogate.supports
        self.options["print_global"] = False
        if "poly" in self._surrogate.options:
            if self._surrogate.options["poly"] != "constant":
                raise ValueError("constant regression must be used with mixed integer")

    @property
    def name(self):
        return "MixedInteger" + self._surrogate.name

    def _initialize(self):
        self.supports["derivatives"] = False

    def set_training_values(self, xt, yt, name=None) -> None:
        xt = ensure_2d_array(xt, "xt")

        # Round inputs
        design_space = self.design_space
        xt, _ = design_space.correct_get_acting(xt)

        if self._input_in_folded_space:
            xt_apply, _, _ = design_space.unfold_x(xt)
        else:
            xt_apply = xt

        super().set_training_values(xt_apply, yt, name)
        self._surrogate.set_training_values(xt_apply, yt, name)

    def update_training_values(self, yt, name=None):
        super().update_training_values(yt, name)
        self._surrogate.update_training_values(yt, name)

    def _train(self):
        self._surrogate._train()

    def predict_values(self, x: np.ndarray) -> np.ndarray:
        x_corr, is_acting = self._get_x_for_surrogate_model(x)
        return self._surrogate.predict_values(x_corr)

    def _predict_intermediate_values(self, x: np.ndarray, lvl) -> np.ndarray:
        x_corr, is_acting = self._get_x_for_surrogate_model(x)
        return self._surrogate._predict_intermediate_values(x_corr, lvl)

    def predict_variances(self, x: np.ndarray) -> np.ndarray:
        x_corr, is_acting = self._get_x_for_surrogate_model(x)
        return self._surrogate.predict_variances(x_corr)

    def predict_variances_all_levels(self, x: np.ndarray) -> np.ndarray:
        x_corr, is_acting = self._get_x_for_surrogate_model(x)
        return self._surrogate.predict_variances_all_levels(x_corr)

    def _get_x_for_surrogate_model(self, x):
        xp = ensure_2d_array(x, "xp")

        x_corr, is_acting = self.design_space.correct_get_acting(xp)
        if self._input_in_folded_space:
            x_corr, is_acting, _ = self.design_space.unfold_x(
                x_corr, is_acting=is_acting
            )
        return x_corr, is_acting

    def _predict_values(self, x: np.ndarray) -> np.ndarray:
        pass


class MixedIntegerKrigingModel(KrgBased):
    """
    Kriging model decorator that takes an SMT continuous surrogate model and
    cast values according x types specification to implement a surrogate model
    handling integer (ORD) or categorical (ENUM) features
    """

    def __init__(
        self,
        surrogate,
        input_in_folded_space=True,
    ):
        """
        Parameters
        ----------
        surrogate: SMT Kriging surrogate model
            instance of a SMT Kriging surrogate model
        """
        super().__init__()
        self._surrogate = surrogate
        if not (isinstance(self._surrogate, KrgBased)):
            raise ValueError(
                "Using MixedIntegerKrigingModel integer model with "
                + str(self._surrogate.name)
                + " is not supported. Please use MixedIntegerSurrogateModel instead."
            )
        self.options["design_space"] = self._surrogate.design_space
        if surrogate.options["hyper_opt"] == "TNC":
            warnings.warn(
                "TNC not available yet for mixed integer handling. Switching to Cobyla"
            )

        self._surrogate.options["hyper_opt"] = "Cobyla"

        self._input_in_folded_space = input_in_folded_space
        self.supports = self._surrogate.supports
        self.options["print_global"] = False

        if "poly" in self._surrogate.options:
            if self._surrogate.options["poly"] != "constant":
                raise ValueError("constant regression must be used with mixed integer")

        design_space = self.design_space
        if (
            any(
                isinstance(dv, CategoricalVariable)
                for dv in design_space.design_variables
            )
            and self._surrogate.options["categorical_kernel"] is None
        ):
            self._surrogate.options["categorical_kernel"] = (
                MixIntKernelType.HOMO_HSPHERE
            )
            warnings.warn(
                "Using MixedIntegerSurrogateModel integer model with Continuous Relaxation is not supported. \
                    Switched to homoscedastic hypersphere kernel instead."
            )
        if self._surrogate.options["categorical_kernel"] is not None:
            self._input_in_folded_space = False

        for key, value in self._surrogate.options._dict.items():
            if key in self.options._dict:
                self.options._dict[key] = value

    @property
    def name(self):
        return "MixedInteger" + self._surrogate.name

    def _initialize(self):
        super()._initialize()
        self.supports["derivatives"] = False

    def set_training_values(self, xt, yt, name=None, is_acting=None):
        xt = ensure_2d_array(xt, "xt")

        # If the is_acting matrix is not given, assume input is not corrected (rounding, imputation, etc.) yet
        design_space = self.design_space
        if is_acting is None:
            xt, is_acting = design_space.correct_get_acting(xt)

        if self._input_in_folded_space:
            xt_apply, is_acting_apply = design_space.unfold_x(xt, is_acting)
        else:
            xt_apply, is_acting_apply = xt, is_acting

        super().set_training_values(xt_apply, yt, name, is_acting=is_acting_apply)
        self._surrogate.set_training_values(
            xt_apply, yt, name, is_acting=is_acting_apply
        )

    def update_training_values(self, yt, name=None):
        super().update_training_values(yt, name)
        self._surrogate.update_training_values(yt, name)

    def _train(self):
        self._surrogate._train()

    def predict_values(self, x: np.ndarray, is_acting=None) -> np.ndarray:
        x_corr, is_acting = self._get_x_for_surrogate_model(np.copy(x))
        return self._surrogate.predict_values(x_corr, is_acting=is_acting)

    def _predict_intermediate_values(
        self, x: np.ndarray, lvl, is_acting=None
    ) -> np.ndarray:
        x_corr, is_acting = self._get_x_for_surrogate_model(x)
        return self._surrogate._predict_intermediate_values(
            x_corr, lvl, is_acting=is_acting
        )

    def predict_variances(self, x: np.ndarray, is_acting=None) -> np.ndarray:
        x_corr, is_acting = self._get_x_for_surrogate_model(np.copy(x))
        return self._surrogate.predict_variances(x_corr, is_acting=is_acting)

    def predict_variances_all_levels(self, x: np.ndarray, is_acting=None) -> np.ndarray:
        x_corr, is_acting = self._get_x_for_surrogate_model(x)
        return self._surrogate.predict_variances_all_levels(x_corr, is_acting=is_acting)

    def _get_x_for_surrogate_model(self, x):
        xp = ensure_2d_array(x, "xp")

        x_corr, is_acting = self.design_space.correct_get_acting(xp)
        if self._input_in_folded_space:
            x_corr, is_acting = self.design_space.unfold_x(x_corr, is_acting=is_acting)
        return x_corr, is_acting

    def _predict_values(self, x: np.ndarray, is_acting=None) -> np.ndarray:
        pass


[docs] class MixedIntegerContext(object): """ Class which acts as sampling method and surrogate model factory to handle integer and categorical variables consistently. """
[docs] def __init__(self, design_space, work_in_folded_space=True): """ Parameters ---------- design_space: BaseDesignSpace the design space definition (includes mixed-discrete and/or hierarchical specifications) work_in_folded_space: bool whether x data are in given in folded space (enum indexes) or not (enum masks) """ self._design_space = ensure_design_space(design_space=design_space) self._unfold_space = not work_in_folded_space self._unfolded_xlimits = self._design_space.get_unfolded_num_bounds() self._work_in_folded_space = work_in_folded_space
@property def design_space(self) -> BaseDesignSpace: return self._design_space
[docs] def build_sampling_method(self, random_state=None): """ Build Mixed Integer LHS ESE sampler. """ return_folded = self._work_in_folded_space def sample(n): x, _ = self._design_space.sample_valid_x( n, unfolded=not return_folded, random_state=random_state ) return x return sample
def build_kriging_model(self, surrogate): """ Build MixedIntegerKrigingModel from given SMT surrogate model. """ surrogate.options["design_space"] = self._design_space if surrogate.options["hyper_opt"] == "TNC": warnings.warn( "TNC not available yet for mixed integer handling. Switching to Cobyla" ) surrogate.options["hyper_opt"] = "Cobyla" return MixedIntegerKrigingModel( surrogate=surrogate, input_in_folded_space=self._work_in_folded_space, )
[docs] def build_surrogate_model(self, surrogate): """ Build MixedIntegerKrigingModel from given SMT surrogate model. """ return MixedIntegerSurrogateModel( self._design_space, surrogate=surrogate, input_in_folded_space=self._work_in_folded_space, )
def get_unfolded_xlimits(self): """ Returns relaxed xlimits Each level of an enumerate gives a new continuous dimension in [0, 1]. Each integer dimensions are relaxed continuously. """ return self._unfolded_xlimits def get_unfolded_dimension(self): """ Returns x dimension (int) taking into account unfolded categorical features """ return len(self._unfolded_xlimits)