import json from sklearn.preprocessing import StandardScaler from pickle import load import numpy as np class VAVPipeline: def __init__(self, rtu_id, scaler_path=None, window_size=30): self.window_size = window_size if rtu_id == 1: self.zones = [69, 68, 67, 66, 65, 64, 42, 41, 40, 39, 38, 37, 36] if rtu_id == 2: self.zones = [ 72, 71, 63, 62, 60, 59, 58, 57, 50, 49, 44, 43, 35, 34, 33, 32, 31, 30, 29, 28, ] outputs = ["temp", "fan_speed"] inputs = ["cooling_sp", "heating_sp"] self.output_col_names = [] self.input_col_names = [ f"rtu_00{rtu_id}_fltrd_sa_flow_tn", f"rtu_00{rtu_id}_sa_temp", "air_temp_set_1", "air_temp_set_2", "dew_point_temperature_set_1d", "relative_humidity_set_1", "solar_radiation_set_1", ] for zone in self.zones: for output in outputs: self.output_col_names.append(f"zone_0{zone}_{output}") for input in inputs: self.input_col_names.append(f"zone_0{zone}_{input}") self.column_names = self.output_col_names + self.input_col_names if scaler_path: self.scaler = self.get_scaler(scaler_path) def get_scaler(self, scaler_path): return load(scaler_path) def get_window(self, df): len_df = len(df) if len_df > self.window_size: return df[len_df - (self.window_size + 1) : len_df].astype("float32") else: return None def transform_window(self, df_window): return self.scaler.transform(df_window) def prepare_input(self, df_trans): return df_trans[: self.window_size, :].reshape( (1, self.window_size, len(self.column_names)) ) def extract_data_from_message(self, message): payload = json.loads(message.payload.decode()) len_df = len(self.df) k = {} for col in self.column_names: k[col] = payload[col] self.df.loc[len_df] = k return self.df def fit(self, message): df = self.extract_data_from_message(message) df_window = self.get_window(df) if df_window is not None: df_trans = self.transform_window(df_window) df_new = self.prepare_input(df_trans) else: df_new = None df_trans = None return df_new, df_trans