Source code for smt.applications.podi

"""
Author: Hugo Reimeringer <hugo.reimeringer@onera.fr>
"""

from sklearn.decomposition import PCA
import numpy as np

from smt.applications.application import SurrogateBasedApplication
from smt.surrogate_models import KRG, KPLS, KPLSK
from smt.utils.checks import ensure_2d_array

PODI_available_models = {
    "KRG": KRG,
    "KPLS": KPLS,
    "KPLSK": KPLSK,
}


[docs] class PODI(SurrogateBasedApplication): """ Class for Proper Orthogonal Decomposition and Interpolation (PODI) surrogate models based. Attributes ---------- n_modes : int Number of kept modes during the POD. singular_vectors : np.ndarray Singular vectors of the POD. singular_values : np.ndarray Singular values of the POD. interp_coeff : list[SurrogateModel] List containing the surrogate models used. Examples -------- >>> from smt.applications import PODI >>> sm = PODI() """ name = "PODI" def _initialize(self) -> None: super(PODI, self)._initialize() self.available_models_name = [] for key in PODI_available_models.keys(): self.available_models_name.append(key) self.n_modes = None self.singular_vectors = None self.singular_values = None self.pod_computed = False self.interp_options_set = False self.training_values_set = False self.train_done = False self.interp_coeff = None @staticmethod def choice_n_modes_tol(EV_list: np.ndarray, tol: float) -> int: """ Static method calculating the required number of kept modes to explain at least the intended ratio of variance. Parameters ---------- EV_list : np.ndarray List of the explained variance of each mode in the POD. tol : float Desired tolerance for the POD. It is expected to be in ]0, 1]. Returns ------- n_modes : int Kept modes according to the tolerance. """ sum_tot = sum(EV_list) sum_ = 0 for i, ev in enumerate(EV_list): sum_ += ev EV_ratio = sum_ / sum_tot if EV_ratio >= tol: return i + 1 return len(EV_list)
[docs] def compute_pod( self, database: np.ndarray, tol: float = None, n_modes: int = None, seed: int = None, ) -> None: """ Performs the POD. Parameters ---------- database : np.ndarray[ny, nt] Snapshot matrix. Each column corresponds to a snapshot. tol : float Desired tolerance for the pod (if n_modes not set). n_modes : int Desired number of kept modes for the pod (if tol not set). seed : int seed number which controls random draws for internal optim. (optional) Examples ---------- >>> sm.compute_pod(database, tol = 0.99) """ choice_svd = None svd = PCA(svd_solver="randomized", random_state=seed) if n_modes is not None: self.n_modes = n_modes choice_svd = "mode" if tol is not None: if choice_svd is not None: raise ValueError( "pod can't use both arguments 'n_modes' and 'tol' at the same time" ) else: choice_svd = "tol" if choice_svd is None: raise ValueError( "either one of the arguments 'n_modes' and 'tol' must be specified" ) database = ensure_2d_array(database, "database") self.n_snapshot = database.shape[1] self.ny = database.shape[0] svd.fit(database.T) self.singular_vectors = svd.components_.T self.singular_values = svd.singular_values_ EV_list = svd.explained_variance_ if choice_svd == "tol": self.n_modes = PODI.choice_n_modes_tol(EV_list, tol) else: if self.n_modes > self.n_snapshot: raise ValueError( "the number of kept modes can't be superior to the number of data values (snapshots)" ) self.EV_ratio = sum(EV_list[: self.n_modes]) / sum(EV_list) self.mean = np.atleast_2d(database.mean(axis=1)).T self.basis = self.singular_vectors[:, : self.n_modes] self.coeff = np.dot(database.T - self.mean.T, self.basis) self.pod_computed = True self.interp_options_set = False self.training_values_set = False
[docs] def get_singular_vectors(self) -> np.ndarray: """ Getter for the singular vectors of the POD. It represents the directions of maximum variance in the data. Returns ------- singular_vectors : np.ndarray singular vectors of the POD. """ return self.singular_vectors
[docs] def get_singular_values(self) -> np.ndarray: """ Getter for the singular values from the Sigma matrix of the POD. Returns ------- singular_values : np.ndarray Singular values of the POD. """ return self.singular_values
[docs] def get_ev_ratio(self) -> float: """ Getter for the explained variance ratio with the kept modes. Returns ------- EV_ratio : float Explained variance ratio with the current kept modes. """ return self.EV_ratio
[docs] def get_n_modes(self) -> int: """ Getter for the number of modes kept during the POD. Returns ------- n_modes : int number of modes kept during the POD. """ return self.n_modes
[docs] def set_interp_options( self, interp_type: str = "KRG", interp_options: list = [{}] ) -> None: """ Set the options for the interpolation surrogate models used. Only required if a model different than KRG is used or if non-default options are desired for the models. Parameters ---------- interp_type : str Name of the type of surrogate model that will be used for the whole set. By default, the Kriging model is used (KRG). interp_options : list[dict] List containing dictionnaries for the options. The k-th dictionnary corresponds to the options of the k-th interpolation model. If the options are common to all surogate models, only a single dictionnary is required in the list. The available options can be found in the documentation of the corresponding surrogate models. By default, the print_global options are set to 'False'. Examples -------- >>> interp_type = "KRG" >>> dict1 = {'corr' : 'matern52', 'theta0' : [1e-2]} >>> dict2 = {'poly' : 'quadratic'} >>> interp_options = [dict1, dict2] >>> sm.set_interp_options(interp_type, interp_options) """ if not self.pod_computed: raise RuntimeError( "'compute_pod' method must have been succesfully executed before trying to set the models options." ) if interp_type not in PODI_available_models.keys(): raise ValueError( f"the surrogate model type should be one of the following : {', '.join(self.available_models_name)}" ) if len(interp_options) == self.n_modes: mode_options = "local" elif len(interp_options) == 1: mode_options = "global" else: raise ValueError( f"expected interp_options of size {self.n_modes} or 1, but got {len(interp_options)}." ) self.interp_coeff = [] for i in range(self.n_modes): if mode_options == "local": index = i elif mode_options == "global": index = 0 sm_i = PODI_available_models[interp_type](print_global=False) for key in interp_options[index].keys(): sm_i.options[key] = interp_options[index][key] self.interp_coeff.append(sm_i) self.interp_options_set = True self.training_values_set = False
[docs] def set_training_values(self, xt: np.ndarray) -> None: """ Set training data (values). If the models' options are still not set, default values are used for the initialization. Parameters ---------- xt : np.ndarray[nt, nx] The input values for the nt training points. """ xt = ensure_2d_array(xt, "xt") if not self.pod_computed: raise RuntimeError( "'compute_pod' method must have been succesfully executed before trying to set the training values." ) if not self.interp_options_set: self.interp_coeff = [] for i in range(self.n_modes): sm_i = PODI_available_models["KRG"](print_global=False) self.interp_coeff.append(sm_i) self.interp_options_set = True self.nt = xt.shape[0] self.nx = xt.shape[1] if self.nt != self.n_snapshot: raise ValueError( f"there must be the same amount of train values than data values, {self.nt} != {self.n_snapshot}." ) for i in range(self.n_modes): self.interp_coeff[i].set_training_values(xt, self.coeff[:, i]) self.training_values_set = True
[docs] def train(self) -> None: """ Performs the training of the model. """ if not self.pod_computed: raise RuntimeError( "'compute_pod' method must have been succesfully executed before trying to train the models." ) if not self.training_values_set: raise RuntimeError( "the training values should have been set before trying to train the models." ) for interp_coeff in self.interp_coeff: interp_coeff.train() self.train_done = True
[docs] def get_interp_coeff(self) -> np.ndarray: """ Getter for the list of the interpolation surrogate models used Returns ------- interp_coeff : np.ndarray[n_modes] List of the kriging models used for the POD coefficients. """ return self.interp_coeff
[docs] def predict_values(self, xn) -> np.ndarray: """ Predict the output values at a set of points. Parameters ---------- xn : np.ndarray Input values for the prediction points. Returns ------- yn : np.ndarray Output values at the prediction points. """ if not self.train_done: raise RuntimeError( "the model should have been trained before trying to make a prediction." ) self.dim_new = xn.shape[1] if self.dim_new != self.nx: raise ValueError( f"the data values and the new values must be the same size, here {self.dim_new} != {self.nx}" ) self.n_new = xn.shape[0] mean_coeff_interp = np.zeros((self.n_modes, self.n_new)) for i in range(self.n_modes): mu_i = self.interp_coeff[i].predict_values(xn) mean_coeff_interp[i] = mu_i[:, 0] y = self.mean + np.dot(self.basis, mean_coeff_interp) return y
[docs] def predict_variances(self, xn) -> np.ndarray: """ Predict the variances at a set of points. Parameters ---------- xn : np.ndarray[nt, nx] Input values for the prediction points. Returns ------- s2 : np.ndarray[nt, ny] Variances. """ if not self.train_done: raise RuntimeError( "the model should have been trained before trying to make a prediction" ) dim_new = xn.shape[1] if dim_new != self.nx: raise ValueError( f"the data values and the new values must be the same size, here {self.dim_new} != {self.nx}" ) self.n_new = xn.shape[0] var_coeff_interp = np.zeros((self.n_modes, self.n_new)) for i in range(self.n_modes): sigma_i_square = self.interp_coeff[i].predict_variances(xn) var_coeff_interp[i] = sigma_i_square[:, 0] s2 = np.dot((self.basis**2), var_coeff_interp) return s2
[docs] def predict_derivatives(self, xn, kx) -> np.ndarray: """ Predict the dy_dx derivatives at a set of points. Parameters ---------- xn : np.ndarray[nt, nx] Input values for the prediction points. kx : int The 0-based index of the input variable with respect to which derivative is desired. Returns ------- dy_dx : np.ndarray[nt, ny] Derivatives. """ d = kx if not self.train_done: raise RuntimeError( "the model should have been trained before trying to make a prediction" ) dim_new = xn.shape[1] if d >= dim_new: raise ValueError( "the desired derivative kx should correspond to a dimension of the data, here kx is out of bounds." ) if dim_new != self.nx: raise ValueError( f"the data values and the new values must be the same size, here {self.dim_new} != {self.nx}" ) self.n_new = xn.shape[0] deriv_coeff_interp = np.zeros((self.n_modes, self.n_new)) for i in range(self.n_modes): deriv_coeff_interp[i] = self.interp_coeff[i].predict_derivatives(xn, d)[ :, 0 ] dy_dx = np.dot(self.basis, deriv_coeff_interp) return dy_dx