import pandas as pd import numpy as np import gpflow import tensorflow as tf from sklearn.exceptions import NotFittedError # Generator for tensorflow functions for a given model def get_model_evaluator(model): @tf.function def model_evaluator(tf_input): preds = model.predict_f(tf_input) return preds return model_evaluator def get_grad_evaluator(model): @tf.function def grad_evaluator(tf_input): with tf.GradientTape() as tape: preds = model.predict_f(tf_input) grads = tape.gradient(preds, tf_input) return grads return grad_evaluator def get_combined_evaluator(model): @tf.function def combined_evaluator(tf_input): with tf.GradientTape() as tape: preds = model.predict_f(tf_input) grads = tape.gradient(preds, tf_input) return preds, grads return combined_evaluator ################################## # SIA norm Temperature Reference ################################## def _get_tref_min(x): X = [19, 23.5] Y = [20.5, 22] a, b = np.polyfit(X, Y, deg = 1) y = a*x + b if y > Y[1]: y = Y[1] elif y < Y[0]: y = Y[0] return y def _get_tref_max(x): X = [12, 17.5] Y = [24.5, 26.5] a, b = np.polyfit(X, Y, deg = 1) y = a*x + b if y > Y[1]: y = Y[1] elif y < Y[0]: y = Y[0] return y def get_tref_range(x): """ Returns the allowed range of reference Temperatures for a given 48h mean outside temperature """ tref_range = (_get_tref_min(x), _get_tref_max(x)) return tref_range def get_tref_mean(x): """ Returns the mean reference temperature for a given 48h mean outside temperature """ tref_range = get_tref_range(x) return 1/2*(tref_range[0] + tref_range[1]) def get_random_signal(nstep, a_range = (-1, 1), b_range = (2, 10), signal_type = 'analog'): a = np.random.rand(nstep) * (a_range[1]-a_range[0]) + a_range[0] # range for amplitude b = np.random.rand(nstep) *(b_range[1]-b_range[0]) + b_range[0] # range for frequency b = np.round(b) b = b.astype(int) b[0] = 0 for i in range(1,np.size(b)): b[i] = b[i-1]+b[i] if signal_type == 'analog': # Random Signal i=0 random_signal = np.zeros(nstep) while b[i] 1, 1, ident_signal) return ident_signal def get_scaled_df(df, dict_cols, scaler): """ Scale the dataframe with the given scaler. Drops unused columns. """ t_list = dict_cols['t'][1] w_list = dict_cols['w'][1] u_list = dict_cols['u'][1] y_list = dict_cols['y'][1] df_local = df[t_list + w_list + u_list + y_list] df_scaled = df_local.to_numpy() try: df_scaled = scaler.transform(df_scaled) except NotFittedError: df_scaled = scaler.fit_transform(df_scaled) df_scaled = pd.DataFrame( df_scaled, index = df_local.index, columns = df_local.columns ) return df_scaled def data_to_gpr(df, dict_cols): t_list = dict_cols['t'][1] w_list = dict_cols['w'][1] u_list = dict_cols['u'][1] y_list = dict_cols['y'][1] df_gpr = df[t_list + w_list + u_list + y_list].copy() for lags, names in dict_cols.values(): for name in names: col_idx = df_gpr.columns.get_loc(name) for lag in range(1, lags + 1): df_gpr.insert( col_idx + lag, f"{name}_{lag}", df_gpr.loc[:, name].shift(lag) ) # Drop empty rows (first rows without enough lags) df_gpr.dropna(inplace = True) return df_gpr def get_gp_model(data, dict_cols): nb_dims = data[0].shape[1] rational_dims = np.arange(0, (dict_cols['t'][0] + 1) * len(dict_cols['t'][1]), 1) nb_rational_dims = len(rational_dims) squared_dims = np.arange(nb_rational_dims, nb_dims, 1) nb_squared_dims = len(squared_dims) squared_l = np.linspace(10, 10, nb_squared_dims) rational_l = np.linspace(10, 10, nb_rational_dims) variance = 1 # Define the kernel to be used k0 = gpflow.kernels.SquaredExponential( lengthscales = squared_l, active_dims = squared_dims, variance = variance ) k1 = gpflow.kernels.Constant( variance = variance ) k2 = gpflow.kernels.RationalQuadratic( lengthscales = rational_l, active_dims = rational_dims, variance = variance ) #k3 = gpflow.kernels.Periodic(k2) k4 = gpflow.kernels.Linear(variance = [1]*nb_dims) k = k4 model = gpflow.models.GPR( data = data, kernel = k, mean_function = None ) return model class ScalerHelper: """ Column order in scaler should be as follows: t_cols + w_cols + u_cols + y_cols """ def __init__(self, scaler): self.scaler = scaler def scale_time(self, t): raise NotImplementedError def inverse_scale_time(self, t): raise NotImplementedError def scale_weather(self, W): W_local = np.array(W).reshape(-1, 2) W_filled = np.hstack([W_local, np.zeros((W_local.shape[0], 4 - W_local.shape[1]))]) W_scaled = self.scaler.transform(W_filled) W_scaled = W_scaled[:, :2] return W_scaled def inverse_scale_weather(self, W): W_local = np.array(W).reshape(-1, 2) W_filled = np.hstack([W_local, np.zeros((W_local.shape[0], 4 - W_local.shape[1]))]) W_scaled = self.scaler.inverse_transform(W_filled) W_scaled = W_scaled[:, :2] return W_scaled def scale_input(self, U): U_local = np.array(U).reshape(-1, 1) U_filled = np.hstack([np.zeros((U_local.shape[0], 2)), U_local, np.zeros((U_local.shape[0], 1))]) U_scaled = self.scaler.transform(U_filled) U_scaled = U_scaled[:, 2] return U_scaled def inverse_scale_input(self, U): U_local = np.array(U).reshape(-1, 1) U_filled = np.hstack([np.zeros((U_local.shape[0], 2)), U_local, np.zeros((U_local.shape[0], 1))]) U_scaled = self.scaler.inverse_transform(U_filled) U_scaled = U_scaled[:, 2] return U_scaled def scale_output(self, Y): Y_local = np.array(Y).reshape(-1, 1) Y_filled = np.hstack([np.zeros((Y_local.shape[0], 3)), Y_local]) Y_scaled = self.scaler.transform(Y_filled) Y_scaled = Y_scaled[:, 3] return Y_scaled def inverse_scale_output(self, Y): Y_local = np.array(Y).reshape(-1, 1) Y_filled = np.hstack([np.zeros((Y_local.shape[0], 3)), Y_local]) Y_scaled = self.scaler.inverse_transform(Y_filled) Y_scaled = Y_scaled[:, 3] return Y_scaled