"""
Author: Hugo Reimeringer <hugo.reimeringer@onera.fr>
"""
import scipy.optimize as opt
from sklearn.utils.extmath import randomized_svd
from sklearn.decomposition import PCA
import numpy as np
from smt.applications.application import SurrogateBasedApplication
from smt.surrogate_models import KPLS, KPLSK, KRG
from smt.utils.checks import ensure_2d_array
PODI_available_models = {
"KRG": KRG,
"KPLS": KPLS,
"KPLSK": KPLSK,
}
class SubspacesInterpolation:
"""
Class that computes an interpolation of POD bases.
Attributes
----------
n_DoE : int
Size of the DoE (Design of Experiments)
n_mu : int
dimension of the parametric space
mu_train : np.ndarray[n_DoE, n_mu]
Database of parameter values where the POD bases have been computed.
n_DoF : int
number of degree of freedom
n_modes : int
number of modes of the POD bases
bases : np.ndarray[n_DoF, n_modes, n_DoE]
Database of the POD bases to interpolate.
print_global : bool
Indicates if the indications plot (except the warnings) should be displayed.
"""
def __init__(
self, DoE_mu, DoE_bases, print_global, snapshots=None, U_ref=None
) -> None:
"""
Initialization.
Parameters
----------
DoE_mu : np.ndarray[n_DoE, n_mu]
Database of parameter values where the POD bases have been computed.
DoE_bases : np.ndarray[n_DoF, n_modes, n_DoE]
Database of the POD bases to interpolate.
"""
self.print_global = print_global
self.mu_train = DoE_mu
self.bases = DoE_bases
self.n_DoE = DoE_mu.shape[0]
self.n_mu = DoE_mu.shape[1]
self.n_DoF = DoE_bases.shape[0]
self.n_modes = DoE_bases.shape[1]
self.snapshots = snapshots
if self.snapshots is not None:
self.n_t = self.snapshots.shape[1]
self.U_ref = U_ref
def exponential_mapping(self, Y0, Ti) -> np.ndarray:
"""
Function that computes the exponential mapping of the Grassmann manifold at point Y0.
Parameters
----------
Y0 : np.ndarray[n_DoF, n_modes]
A point on the Grassmann manifold Gr(n_DoF,n_modes).
Ti : np.ndarray[n_DoF, n_modes]
A vector in the tangent plane of Gr(n_DoF,n_modes) at Y0.
Returns
-------
Yi : np.ndarray [n_DoF,n_modes]
A point on Gr(n_DoF,n_modes).
The image of Ti by the exponential mapping at Y0. $$Yi=exp_{Y0}(Ti)$$
"""
u, s, v = np.linalg.svd(Ti, full_matrices=False)
n_new = u.shape[0]
n_real = u.shape[1]
M = np.repeat(np.eye(u.shape[3])[:, :, np.newaxis], n_real, axis=2)
M = np.repeat(M[:, :, :, np.newaxis], n_new, axis=3)
coss = M * np.cos(s).T
coss = np.transpose(coss, (3, 2, 0, 1))
sins = M * np.sin(s).T
sins = np.transpose(sins, (3, 2, 0, 1))
vt = np.transpose(v, (0, 1, 3, 2))
Yi = Y0 @ (vt @ coss) + u @ sins
return Yi
def logarithmic_mapping(self, Y0, Yi) -> np.ndarray:
"""
Function that computes the logarithmic mapping of the Grassmann manifold at point Y0.
Parameters
----------
Y0: np.ndarray[n_DoF,n_modes]
A point on the Grassmann manifold Gr(n_DoF,n_modes)
Yi: np.ndarray[n_DoF,n_modes]
A point on the Grassmann manifold Gr(n_DoF,n_modes)
Returns
-------
Ti: np.ndarray[n_DoF,n_modes]
A vector on the tangent plane of Gr(n_DoF,n_modes) at Y0.
The image of Yi by the logarithmic mapping at Y0. $$Ti=log_{Y0}(Yi)$$
"""
X = np.linalg.solve(np.dot(Y0.T, Yi), np.eye(Y0.shape[1]))
M = np.dot(Yi, X) - Y0
u, s, v = np.linalg.svd(M, full_matrices=False)
arctans = np.eye(Y0.shape[1]) * np.arctan(s)
Ti = np.dot(u, np.dot(arctans, v))
return Ti
def log_DoE(self, Y0) -> np.ndarray:
"""
Function that computes the logarithmic mapping of the DoE at a given point Y0.
Parameters
----------
Y0: np.ndarray[n_DoF,n_modes]
A point on the Grassmann manifold Gr(n_DoF,n_modes).
Returns
-------
T_train: np.ndarray[n_DoF,n_modes,n_DoE]
n_DoE vectors on Gr(n_DoF,n_modes), images of the DoE by the logarithmic mapping at Y0
"""
T_train = np.zeros((self.n_DoF, self.n_modes, self.n_DoE))
for i in range(self.n_DoE):
T_train[:, :, i] = self.logarithmic_mapping(Y0, self.bases[:, :, i])
return T_train
def compute_Frechet_mean(
self, P0, itermax=20, epsilon=1e-6, alpha_0=1e-3, optimal_step=True
):
"""
Function that computes the Frechet mean of the set DoE_bases.
Parameters
----------
P0: np.ndarray[n_DoF,n_modes]
Initial guess for the Frechet mean.
iter_max: int, default = 20
Maximum number of iterations in the gradient descend.
epsilon: float, defaut = 1e-6
Threshold for the stopping criterion.
alpha_0: float, default = 1e-3
Initial guess for the step of the gradient descend algorithm.
optimal_step: bool, default = True
Use optimal step size.
Returns
-------
P_star: np.ndarray[n_DoF, n_modes]
Computed Frechet mean
"""
it = 0
cv = 1
if optimal_step:
# define the function that will be optimized to find the optimal step size
def obj_fun_i(alpha, P_i, delta_P):
Ti = alpha * delta_P
P_i_temp = self.exponential_mapping(
P_i[np.newaxis, np.newaxis, :, :], Ti[np.newaxis, np.newaxis, :, :]
)
delta_P_temp = self.log_DoE(P_i_temp[0, 0])
delta_P_temp_norm = np.linalg.norm(delta_P_temp, axis=(0, 1)) ** 2
return delta_P_temp_norm.sum()
i = 0
obj_fun = []
while it < itermax and cv > epsilon:
# compute the gradient
delta_P = self.log_DoE(P0)
obj_fun.append(np.linalg.norm(delta_P, axis=(0, 1)) ** 2)
delta_P = delta_P.sum(axis=2)
if optimal_step:
res = opt.minimize(obj_fun_i, alpha_0, args=(P0, delta_P))
step = res["x"]
else:
step = alpha_0
delta_P = step * delta_P
P_i = self.exponential_mapping(
P0[np.newaxis, np.newaxis, :, :], delta_P[np.newaxis, np.newaxis, :, :]
)[0, 0]
if i > 0:
cv = np.linalg.norm(obj_fun[i] - obj_fun[i - 1]) / np.linalg.norm(
obj_fun[i]
)
P0 = P_i
i += 1
it += 1
P_star = P_i
return P_star, obj_fun
def compute_tangent_plane_basis_and_DoE_coordinates(
self, Y0, epsilon=1e-3, compute_GP=True
):
"""
Function that computes a basis of a subspace of the tangent plane at Y0
It is done from a set of vector that belong to this tangent plane,
and the coefficients of the tangent vectors in this basis.
Parameters
----------
Y0: np.ndarray[n_DoF,n_modes]
A point on the Grassmann manifold Gr(n_DoF,n_modes).
epsilon: float
Truncation parameter, the basis is created by keeping the n_B first modes such that
$$\frac{\\sum_i=1^{n_B} s_i}{\\sum_i=1^{n_DoE} s_i}>1-epsilon
compute_GP: bool (default=True)
Computes the GP approximation of each coefficient
Returns
-------
Basis: np.ndarray[n_DoF*n_modes,n_B]
Basis of the subspace of the tangent plane of interest.
alpha: np.ndarray[n_B,n_modes]
Generalized coefficients of the tangent vectors in the new basis.
Z_mean: np.ndarray[n_DoF*n_modes,]
Mean value of the flatten tangent vectors.
"""
self.Y0 = Y0
# compute the vectors on the tangent plane at Y0
T_train = self.log_DoE(Y0)
# flatten the tangent vectors
Z = np.zeros((self.n_DoF * self.n_modes, self.n_DoE))
for i in range(self.n_DoE):
Z[:, i] = T_train[:, :, i].flatten()
self.Z_mean = Z.mean(axis=1)
Z_centered = (Z.T - self.Z_mean).T
# svd of the Z matrix:
u, s, _ = randomized_svd(Z_centered, n_components=self.n_DoE, random_state=0)
# Information about the truncature
self.n_B = np.argwhere(s.cumsum() / s.sum() >= 1 - epsilon)[0, 0] + 1
if self.print_global:
print(
"The Grassmann manifold of interest is of dimension "
+ str((self.n_DoF - self.n_modes) * self.n_modes)
)
print("The number of tangent vectors is " + str(self.n_DoE))
print(
"The dimension of the subspace of the tangent plane is " + str(self.n_B)
)
if self.n_B == self.n_DoE:
print(
"WARNING: the dimension of the tangent plane's subspace is equal to the DoE size."
)
print("Consider increasing the DoE.")
# truncature
self.Basis = u[:, : self.n_B]
# projection to get the coefficients in this basis
self.alpha = np.dot(self.Basis.T, Z_centered)
if compute_GP:
self.compute_GP(self.alpha)
return self.Basis, self.alpha, self.Z_mean
def compute_GP(self, alpha, kernel="matern52") -> None:
"""
Function that computes the GP interpolation functions of each alpha coefficients.
Parameters
----------
alpha: np.ndarray[n_B,n_modes]
Set of generalized coefficients expressing the tangent vectors in a subspace of the tangent plane.
kernel: str
Choice of the kernel of the GPs see SMT documentation.
"""
self.GP = []
n = self.n_B
for i in range(n):
gp = KRG(
theta0=[1e-2] * self.n_mu,
print_prediction=False,
corr=kernel,
print_global=False,
)
gp.set_training_values(self.mu_train, alpha[i, :])
gp.train()
self.GP.append(gp)
def pred_coeff(self, mu, compute_var=False):
"""
Function that interpolates the coefficients of the tangent vector at n_new parametric points mu.
Parameters
----------
mu: np.ndarray[n_new,n_mu]
Parametric points where the interpolation should be provided.
compute_var: bool (default=False)
Computes the variance of the interpolation.
Returns
-------
coeff: np.ndarray[n_B, n_new]
Mean value of the prediction.
var: np.ndarray[n_B, n_new]
Variance of the prediction.
"""
n_new = mu.shape[0]
n = self.n_B
coeff = np.zeros((n, n_new))
for i in range(n):
coeff[i] = self.GP[i].predict_values(mu)[:, 0]
if not (compute_var):
return coeff
elif compute_var:
var = np.zeros((n, n_new))
for i in range(n):
var[i] = self.GP[i].predict_variances(mu)[:, 0]
return coeff, var
def interp_POD_basis(
self, mu, compute_realizations=False, n_real=1, fixed_xi=False, xi=None
):
"""
Function that computes the interpolation of the POD basis at n_new parametric points mu.
Parameters
----------
mu: np.ndarray[n_new,n_mu]
Parametric points where the interpolation should be provided.
realizations: bool (default=False)
Compute n_real random realizations of the interpolation.
n_real: float
Number of random realizations.
Returns
-------
Basis: np.ndarray[n_mu, n_real, n_DoF, n_modes]
POD basis at points mu.
Basis_real: np.ndarray[n_mu, n_real, n_DoF, n_modes]
Random POD bases at points mu.
"""
n_new = mu.shape[0]
if not (compute_realizations):
# compute the interpolation of the coefficients
coeff = self.pred_coeff(mu)
# compute the corresponding tangent vectors
Zi = np.dot(self.Basis, coeff) + self.Z_mean[:, np.newaxis]
# injectivity test to
pca = PCA(svd_solver="randomized", random_state=42)
pca.fit(Zi)
singular_values = pca.singular_values_
keep_injectivity = max(singular_values) < np.pi / 2
if not (keep_injectivity):
print(
"Warning : There is loss of injectivity for the interpolation of subspaces."
)
# reshape to get the matrices
Ti = Zi.reshape((self.n_DoF, self.n_modes, n_new, 1))
# transpose to apply the SVD in exp map to each matrix
Ti = np.transpose(Ti, (2, 3, 0, 1))
ind = abs(Ti) < 1e-12
Ti[ind] = 0.0
# exponential mapping
Yi = self.exponential_mapping(self.Y0, Ti)
return Yi
elif compute_realizations:
if n_new != 1:
print(
f"WARNING: Sample of size {n_real} at {n_new} points of a {self.n_DoF}*{self.n_modes} matrix"
)
print(
f"Be sure to get sufficient memory for {n_real*n_new*self.n_DoF*self.n_modes} floats"
)
# compute the interpolation of the coefficients and the associated variance
coeff, var = self.pred_coeff(mu, compute_var=True)
# get a normal random vector
if not (fixed_xi):
xi = np.random.normal(size=(len(coeff), n_real))
else:
xi = xi
# we append a zeros vector so that the last realization is the MAP
xi = np.concatenate((xi, np.zeros((len(coeff), 1))), axis=1)
n_real = n_real + 1
var_coeff = np.sqrt(var).T[:, :, np.newaxis] * xi
coeff_MC = (coeff.T)[:, :, np.newaxis] + var_coeff
# compute the tangent vector
Zi = np.dot(self.Basis, coeff_MC) + self.Z_mean[:, np.newaxis, np.newaxis]
# reshape to get the matrix
Ti = Zi.reshape((self.n_DoF, self.n_modes, n_new, n_real))
# transpose to apply the SVD in exp map to each matrix
Ti = np.transpose(Ti, (2, 3, 0, 1))
# exponential mapping
Yi = self.exponential_mapping(self.Y0, Ti)
return Yi[:, -1, :, :], Yi[:, :-1, :, :]
[docs]
class PODI(SurrogateBasedApplication):
"""
Class for Proper Orthogonal Decomposition and Interpolation (PODI) surrogate models based.
Attributes
----------
pod_type : str
Indicates which type of POD should be performed ('global' or 'local')
nx : int
Dimension of the inputs in the DoE;
n_snapshot : int
Number of snapshots in the database.
ny : int
Dimension of the vector associated to a snapshot
database : np.ndarray[ny, n_snapshot]
Database containing the vectorial snapshots.
n_modes : int
Number of kept modes during the POD.
basis : np.ndarray[ny, n_modes]
POD basis.
EV_ratio : float
Ratio of explained variance according to the kept modes during the POD (only for global POD).
singular_vectors : np.ndarray
Singular vectors of the POD (only for global POD).
singular_values : np.ndarray
Singular values of the POD (only for global POD).
interp_coeff : list[SurrogateModel]
List containing the surrogate models used during the interpolation.
Examples
--------
>>> from smt.applications import PODI
>>> sm = PODI()
"""
name = "PODI"
def _initialize(self) -> None:
super(PODI, self)._initialize()
self.available_models_name = []
for key in PODI_available_models.keys():
self.available_models_name.append(key)
self.pod_type = None
self.n_modes = None
self.singular_vectors = None
self.singular_values = None
self.basis = None
self.pod_computed = False
self.interp_options_set = False
self.training_values_set = False
self.train_done = False
self.interp_coeff = None
@staticmethod
def choice_n_modes_tol(EV_list: np.ndarray, tol: float) -> int:
"""
Static method calculating the required number of kept modes to explain at least the intended ratio of variance.
Parameters
----------
EV_list : np.ndarray
List of the explained variance of each mode in the POD.
tol : float
Desired tolerance for the POD. It is expected to be in ]0, 1].
Returns
-------
n_modes : int
Kept modes according to the tolerance.
"""
sum_tot = sum(EV_list)
sum_ = 0
for i, ev in enumerate(EV_list):
sum_ += ev
EV_ratio = sum_ / sum_tot
if EV_ratio >= tol:
return i + 1
return len(EV_list)
@staticmethod
def interp_subspaces(
xt1,
input_matrices,
xn1,
print_global=True,
ref_index=0,
frechet=False,
frechet_guess=None,
compute_realizations=False,
n_realizations=1,
) -> list:
"""
Static method computing the interpolation of subspaces.
Parameters
----------
xt1 : np.ndarray[nt1, dim]
The input scalar values corresponding to each local bases.
dim = dimension of parametric space for local bases
input_bases : list[np.ndarray[ny, n_modes]]
List containing the local bases for the subspaces' interpolation.
Each matrix is associated to a scalar value from xt1.
xn1 : np.ndarray[nn, dim]
The scalar values at which we want to compute a local basis.
dim = dimension of parametric space for local bases
ref_index : int
Index of the local base from 'input_matrices' we want to use as the reference point. Default value is 0.
(Only if frechet set to False)
frechet : bool
Indicates if the frechet's mean should be computed.
frechet_guess : np.ndarray[ny, n_modes]
Initial guess for the frechet's mean. Default value is the fisrt local basis of 'input_matrices'.
(Only if frechet set to True)
compute_realizations : bool
Indicates if some realizations of random POD bases should be computed.
n_realizations : int
In case compute_realizations is set to True, indicates the number of realizations that should be computed.
Returns
-------
bases : list[np.ndarray[ny, n_modes]]
List of the output bases at each desired value of xn1.
bases_realizations : list[list[np.ndarray[ny, n_modes]]]
List of realizations of the output bases. (Random POD bases)
Returned only if compute_realizations is set to True.
The list contain for each value of xn1 a list of the associated realizations.
Examples
--------
#normal case
>>> bases = PODI.interp_subspaces(xt1=xt1, input_bases=matrices, xn1=xn1)
#use of frechet
>>> bases = PODI.interp_subspaces(
xt1=xt1, input_bases=matrices, xn1=xn1,
frechet=True,
frechet_guess=frechet_matrix
)
"""
nt1 = xt1.shape[0]
n_bases = len(input_matrices)
if nt1 != n_bases:
raise ValueError(
f"the xt1 rows should correspond to each basis, there are {nt1} xt1 rows, but {n_bases} input bases."
)
nn = xn1.shape[0]
ny, n_modes = input_matrices[0].shape
DoE_bases = np.zeros((ny, n_modes, n_bases))
########vérifier qu'en sortie matrice bonnes dimensions, pareil pour réalisations
first_dim = input_matrices[0].shape
for i, basis in enumerate(input_matrices):
if basis.shape != first_dim:
raise ValueError("The input matrices should have the same dimensions.")
DoE_bases[:, :, i] = basis
interp = SubspacesInterpolation(
DoE_mu=xt1, DoE_bases=DoE_bases, print_global=print_global
)
if frechet:
if frechet_guess is not None:
P0 = frechet_guess
else:
P0 = input_matrices[0]
Y0_frechet, _ = interp.compute_Frechet_mean(P0=P0)
Y0 = Y0_frechet
else:
Y0 = input_matrices[ref_index]
interp.compute_tangent_plane_basis_and_DoE_coordinates(Y0=Y0)
if compute_realizations:
Yi_full, Yi_full2 = interp.interp_POD_basis(
xn1,
compute_realizations=True,
n_real=n_realizations,
)
yi = Yi_full
interpolated_bases = []
yi_real = []
for real in range(Yi_full2.shape[1]):
yi_real.append(Yi_full2[:, real, :, :])
interpolated_bases_real = []
for i in range(nn):
interpolated_basis = yi[i, :, :]
interpolated_bases.append(interpolated_basis)
realizations_i = []
for j in range(n_realizations):
realization_j_of_i = yi_real[j][i, :, :]
realizations_i.append(realization_j_of_i)
interpolated_bases_real.append(realizations_i)
return interpolated_bases, interpolated_bases_real
else:
Yi_full = interp.interp_POD_basis(xn1, compute_realizations=False)
yi = Yi_full[:, 0, :, :]
interpolated_bases = []
for i in range(nn):
interpolated_basis = yi[i, :, :]
interpolated_bases.append(interpolated_basis)
return interpolated_bases
def compute_global_pod(
self,
tol: float = None,
n_modes: int = None,
seed: int = None,
) -> None:
"""
Performs the global POD.
Parameters
----------
tol : float
Desired tolerance for the pod (if n_modes not set).
n_modes : int
Desired number of kept modes for the pod (if tol not set).
seed : int
seed number which controls random draws for internal optim. (optional)
"""
choice_svd = None
svd = PCA(svd_solver="randomized", random_state=seed)
if n_modes is not None:
self.n_modes = n_modes
choice_svd = "mode"
if tol is not None:
if choice_svd is not None:
raise ValueError(
"pod can't use both arguments 'n_modes' and 'tol' at the same time"
)
else:
choice_svd = "tol"
if choice_svd is None:
raise ValueError(
"pod needs atleast one of the arguments 'n_modes' or 'tol'"
)
svd.fit(self.database.T)
self.singular_vectors = svd.components_.T
self.singular_values = svd.singular_values_
EV_list = svd.explained_variance_
if choice_svd == "tol":
self.n_modes = PODI.choice_n_modes_tol(EV_list, tol)
else:
if n_modes > self.n_snapshot:
raise ValueError(
"the number of kept modes can't be superior to the number of data values (snapshots)"
)
self.EV_ratio = sum(EV_list[: self.n_modes]) / sum(EV_list)
self.EV_list = EV_list
self.basis = self.singular_vectors[:, : self.n_modes]
def compute_pod(
self,
database: np.ndarray,
pod_type: str = "global",
tol: float = None,
n_modes: int = None,
seed: int = None,
local_basis: np.ndarray = None,
) -> None:
"""
Performs the POD (global or local).
Parameters
----------
database : np.ndarray[ny, n_snapshot]
Snapshot matrix. Each column corresponds to a snapshot.
pod_type : str
Name of the pod type that should be performed : 'global' or 'local'. Default value is 'global'.
tol : float
Desired tolerance for the pod (if n_modes not set).
Only for global POD, pod_type set to 'global'.
n_modes : int
Desired number of kept modes for the pod (if tol not set).
Only for global POD, pod_type set to 'global'.
seed : int
Seed number which controls random draws for internal optim. (optional)
Only for global POD, pod_type set to 'global'.
local_basis : np.ndarray[ny, n_modes]
Local basis used for the local POD.
Examples
--------
#global POD
>>> sm.compute_pod(database, pod_type = 'global', tol = 0.99)
#local POD
>>> sm.compute_pod(database, pod_type = 'local', local_basis = basis)
"""
self.database = ensure_2d_array(database, "database")
self.n_snapshot = database.shape[1]
self.ny = database.shape[0]
self.mean = np.atleast_2d(database.mean(axis=1)).T
if pod_type == "global":
self.compute_global_pod(tol=tol, n_modes=n_modes, seed=seed)
elif pod_type == "local":
if local_basis is None:
raise ValueError("'local_basis' should be specified")
self.n_modes = local_basis.shape[1]
ny = local_basis.shape[0]
if ny != self.ny:
raise ValueError(
f"the first dimension of the database and the local basis must be the same, {ny} != {self.ny}."
)
self.basis = local_basis
else:
raise ValueError(
f"the pod type should be 'global' or 'local', not {pod_type}."
)
self.coeff = np.dot(database.T - self.mean.T, self.basis)
self.pod_type = pod_type
self.pod_computed = True
self.interp_options_set = False
@staticmethod
def compute_pod_errors(
xt: np.ndarray,
database: np.ndarray,
interp_type: str = "KRG",
interp_options: list = [{}],
) -> list:
"""
Calculates different errors for the POD.
Parameters
----------
xt : np.ndarray[n_snapshot, nx]
The input values for the n_snapshot training points.
database : np.ndarray[ny, n_snapshot]
Snapshot matrix. Each column corresponds to a snapshot.
interp_type : str
Name of the type of surrogate model that will be used for the whole set.
By default, the Kriging model is used (KRG).
interp_options : list[dict]
List containing dictionnaries for the options.
The k-th dictionnary corresponds to the options of the k-th interpolation model.
If the options are common to all surogate models, only a single dictionnary is required in the list.
The available options can be found in the documentation of the corresponding surrogate models.
By default, the print_global options are set to 'False'.
Returns
-------
error_list : list[float]
List of 3 POD errors : projection error, interpolation error and total error (projection and interpolation).
"""
xt = ensure_2d_array(xt, "xt")
nt = xt.shape[0]
n_snapshot = database.shape[1]
if nt != n_snapshot:
raise ValueError(
f"there must be the same amount of train values than data values, {nt} != {n_snapshot}."
)
max_interp_error = 0
max_proj_error = 0
max_total_error = 0
for n in range(n_snapshot):
reduced_database = np.concatenate(
(database[:, :n], database[:, n + 1 :]), axis=1
)
reduced_xt = np.concatenate((xt[:n], xt[n + 1 :]))
single_snapshot = np.atleast_2d(database[:, n]).T
single_xt = np.atleast_2d(xt[n])
podi = PODI()
podi.compute_pod(
database=reduced_database, n_modes=min(reduced_database.shape)
)
reduced_mean = np.atleast_2d(reduced_database.mean(axis=1)).T
reduced_basis = podi.get_singular_vectors()
n_modes = podi.get_n_modes()
true_coeff = np.atleast_2d(
np.dot(single_snapshot.T - reduced_mean.T, reduced_basis)
).T
recomposed = reduced_mean + reduced_basis.dot(true_coeff)
proj_error = recomposed - single_snapshot
rms_proj_error = np.sqrt(np.mean(proj_error**2))
max_proj_error = max(max_proj_error, rms_proj_error)
podi.set_interp_options(
interp_type=interp_type, interp_options=interp_options
)
podi.set_training_values(xt=reduced_xt)
podi.train()
reduced_interp_coeff = podi.get_interp_coeff()
mean_coeff_interp = np.zeros((n_modes, 1))
for i, coeff in enumerate(reduced_interp_coeff):
mu_i = coeff.predict_values(single_xt)
mean_coeff_interp[i] = mu_i[:, 0]
interp_error = mean_coeff_interp - true_coeff
rms_interp_error = np.sqrt(np.mean(interp_error**2))
max_interp_error = max(max_interp_error, rms_interp_error)
recomposed = reduced_mean + reduced_basis.dot(mean_coeff_interp)
total_error = recomposed - single_snapshot
rms_total_error = np.sqrt(np.mean(total_error**2))
max_total_error = max(max_total_error, rms_total_error)
return [max_interp_error, max_proj_error, max_total_error]
[docs]
def get_singular_vectors(self) -> np.ndarray:
"""
Getter for the singular vectors of the global POD.
It represents the directions of maximum variance in the data.
Returns
-------
singular_vectors : np.ndarray
Singular vectors of the global POD.
"""
return self.singular_vectors
[docs]
def get_basis(self) -> np.ndarray:
"""
Getter for the basis used for the POD.
Returns
-------
basis : np.ndarray
Basis of the POD.
"""
return self.basis
[docs]
def get_singular_values(self) -> np.ndarray:
"""
Getter for the singular values from the Sigma matrix of the POD.
Returns
-------
singular_values : np.ndarray
Singular values of the POD.
"""
return self.singular_values
def get_ev_list(self) -> float:
"""
Getter for the explained variance list.
Returns
-------
EV_ratio : float
Explained variance ratio with the current kept modes.
"""
return self.EV_list
[docs]
def get_ev_ratio(self) -> float:
"""
Getter for the explained variance ratio with the kept modes.
Returns
-------
EV_ratio : float
Explained variance ratio with the current kept modes.
"""
return self.EV_ratio
[docs]
def get_n_modes(self) -> int:
"""
Getter for the number of modes kept during the POD.
Returns
-------
n_modes : int
number of modes kept during the POD.
"""
return self.n_modes
[docs]
def set_interp_options(
self, interp_type: str = "KRG", interp_options: list = [{}]
) -> None:
"""
Set the options for the interpolation surrogate models used.
Only required if a model different than KRG is used or if non-default options are desired for the models.
Parameters
----------
interp_type : str
Name of the type of surrogate model that will be used for the whole set.
By default, the Kriging model is used (KRG).
interp_options : list[dict]
List containing dictionnaries for the options.
The k-th dictionnary corresponds to the options of the k-th interpolation model.
If the options are common to all surogate models, only a single dictionnary is required in the list.
The available options can be found in the documentation of the corresponding surrogate models.
By default, the print_global options are set to 'False'.
Examples
--------
>>> interp_type = "KRG"
>>> dict1 = {'corr' : 'matern52', 'theta0' : [1e-2]}
>>> dict2 = {'poly' : 'quadratic'}
>>> interp_options = [dict1, dict2]
>>> sm.set_interp_options(interp_type, interp_options)
"""
if not self.pod_computed:
raise RuntimeError(
"'compute_pod' method must have been succesfully executed before trying to set the models options."
)
if interp_type not in PODI_available_models.keys():
raise ValueError(
f"the surrogate model type should be one of the following : {', '.join(self.available_models_name)}"
)
if len(interp_options) == self.n_modes:
mode_options = "local"
elif len(interp_options) == 1:
mode_options = "global"
else:
raise ValueError(
f"expected interp_options of size {self.n_modes} or 1, but got {len(interp_options)}."
)
self.interp_coeff = []
for i in range(self.n_modes):
if mode_options == "local":
index = i
elif mode_options == "global":
index = 0
sm_i = PODI_available_models[interp_type](print_global=False)
for key in interp_options[index].keys():
sm_i.options[key] = interp_options[index][key]
self.interp_coeff.append(sm_i)
self.interp_options_set = True
[docs]
def set_training_values(self, xt: np.ndarray) -> None:
"""
Set training data (values).
If the models' options are still not set, default values are used for the initialization.
Parameters
----------
xt : np.ndarray[n_snapshot, nx]
The input values for the n_snapshot training points.
"""
xt = ensure_2d_array(xt, "xt")
if not self.pod_computed:
raise RuntimeError(
"'compute_pod' method must have been succesfully executed before trying to set the training values."
)
if not self.interp_options_set:
self.interp_coeff = []
for i in range(self.n_modes):
sm_i = PODI_available_models["KRG"](print_global=False)
self.interp_coeff.append(sm_i)
self.interp_options_set = True
nt = xt.shape[0]
self.nx = xt.shape[1]
if nt != self.n_snapshot:
raise ValueError(
f"there must be the same amount of train values than snapshots, {nt} != {self.n_snapshot}."
)
for i in range(self.n_modes):
self.interp_coeff[i].set_training_values(xt, self.coeff[:, i])
self.training_values_set = True
[docs]
def train(self) -> None:
"""
Performs the training of the model.
"""
if not self.pod_computed:
raise RuntimeError(
"'compute_pod' method must have been succesfully executed before trying to train the models."
)
if not self.training_values_set:
raise RuntimeError(
"the training values should have been set before trying to train the models."
)
for interp_coeff in self.interp_coeff:
interp_coeff.train()
self.train_done = True
[docs]
def get_interp_coeff(self) -> np.ndarray:
"""
Getter for the list of the interpolation surrogate models used
Returns
-------
interp_coeff : np.ndarray[n_modes]
List of the kriging models used for the POD coefficients.
"""
return self.interp_coeff
[docs]
def predict_values(self, xn) -> np.ndarray:
"""
Predict the output values at a set of points.
Parameters
----------
xn : np.ndarray[n_new, nx]
Input values for the prediction points.
Returns
-------
yn : np.ndarray[n_new, nx]
Output values at the prediction points.
"""
if not self.train_done:
raise RuntimeError(
"the model should have been trained before trying to make a prediction."
)
xn = ensure_2d_array(xn, "xn")
dim_new = xn.shape[1]
if dim_new != self.nx:
raise ValueError(
f"the data values and the new values must be the same size, here {dim_new} != {self.nx}"
)
self.n_new = xn.shape[0]
mean_coeff_interp = np.zeros((self.n_modes, self.n_new))
for i in range(self.n_modes):
mu_i = self.interp_coeff[i].predict_values(xn)
mean_coeff_interp[i] = mu_i[:, 0]
y = self.mean + np.dot(self.basis, mean_coeff_interp)
return y
[docs]
def predict_variances(self, xn) -> np.ndarray:
"""
Predict the variances at a set of points.
Parameters
----------
xn : np.ndarray[n_new, nx]
Input values for the prediction points.
Returns
-------
s2 : np.ndarray[ny, n_new]
Variances.
"""
if not self.train_done:
raise RuntimeError(
"the model should have been trained before trying to make a prediction"
)
xn = ensure_2d_array(xn, "xn")
dim_new = xn.shape[1]
if dim_new != self.nx:
raise ValueError(
f"the data values and the new values must be the same size, here {dim_new} != {self.nx}"
)
self.n_new = xn.shape[0]
var_coeff_interp = np.zeros((self.n_modes, self.n_new))
for i in range(self.n_modes):
sigma_i_square = self.interp_coeff[i].predict_variances(xn)
var_coeff_interp[i] = sigma_i_square[:, 0]
s2 = np.dot((self.basis**2), var_coeff_interp)
return s2
[docs]
def predict_derivatives(self, xn, kx) -> np.ndarray:
"""
Predict the dy_dx derivatives at a set of points.
Parameters
----------
xn : np.ndarray[n_new, nx]
Input values for the prediction points.
kx : int
The 0-based index of the input variable with respect to which derivative is desired.
Returns
-------
dy_dx : np.ndarray[ny, n_new]
Derivatives.
"""
d = kx
if not self.train_done:
raise RuntimeError(
"the model should have been trained before trying to make a prediction"
)
xn = ensure_2d_array(xn, "xn")
dim_new = xn.shape[1]
if d >= dim_new:
raise ValueError(
"the desired derivative kx should correspond to a dimension of the data, here kx is out of bounds."
)
if dim_new != self.nx:
raise ValueError(
f"the data values and the new values must be the same size, here {dim_new} != {self.nx}"
)
self.n_new = xn.shape[0]
deriv_coeff_interp = np.zeros((self.n_modes, self.n_new))
for i in range(self.n_modes):
deriv_coeff_interp[i] = self.interp_coeff[i].predict_derivatives(xn, d)[
:, 0
]
dy_dx = np.dot(self.basis, deriv_coeff_interp)
return dy_dx
def predict_variance_derivatives(self, xn, kx) -> np.ndarray:
"""
Predict the derivatives of the variances at a set of points.
Parameters
----------
xn : np.ndarray[n_new, nx]
Input values for the prediction points.
kx : int
The 0-based index of the input variable with respect to which derivative is desired.
Returns
-------
dv_dx : np.ndarray[ny, n_new]
Derivatives of the variances.
"""
d = kx
if not self.train_done:
raise RuntimeError(
"the model should have been trained before trying to make a prediction"
)
xn = ensure_2d_array(xn, "xn")
dim_new = xn.shape[1]
if d >= dim_new:
raise ValueError(
"the desired derivative kx should correspond to a dimension of the data, here kx is out of bounds."
)
if dim_new != self.nx:
raise ValueError(
f"the data values and the new values must be the same size, here {dim_new} != {self.nx}"
)
self.n_new = xn.shape[0]
deriv_coeff_interp = np.zeros((self.n_modes, self.n_new))
for i in range(self.n_modes):
deriv_coeff_interp[i] = self.interp_coeff[i].predict_variance_derivatives(
xn, d
)[:, 0]
dv_dx = np.dot(self.basis, deriv_coeff_interp)
return dv_dx