Spaces:
Runtime error
Runtime error
from itertools import chain | |
from pathlib import Path | |
from typing import List, Optional | |
import neuralforecast as nf | |
import numpy as np | |
import pandas as pd | |
import pytorch_lightning as pl | |
from datasetsforecast.utils import download_file | |
from hyperopt import hp | |
from neuralforecast.auto import NHITS as autoNHITS | |
from neuralforecast.data.tsdataset import WindowsDataset | |
from neuralforecast.data.tsloader import TimeSeriesLoader | |
from neuralforecast.models.mqnhits.mqnhits import MQNHITS | |
from neuralforecast.models.nhits.nhits import NHITS | |
# GLOBAL PARAMETERS | |
DEFAULT_HORIZON = 30 | |
HYPEROPT_STEPS = 10 | |
MAX_STEPS = 1000 | |
N_TS_VAL = 2 * 30 | |
MODELS = { | |
"Pretrained N-HiTS M4 Hourly": { | |
"card": "nhitsh", | |
"max_steps": 0, | |
"model": "nhits_m4_hourly", | |
}, | |
"Pretrained N-HiTS M4 Hourly (Tiny)": { | |
"card": "nhitsh", | |
"max_steps": 0, | |
"model": "nhits_m4_hourly_tiny", | |
}, | |
"Pretrained N-HiTS M4 Daily": { | |
"card": "nhitsd", | |
"max_steps": 0, | |
"model": "nhits_m4_daily", | |
}, | |
"Pretrained N-HiTS M4 Monthly": { | |
"card": "nhitsm", | |
"max_steps": 0, | |
"model": "nhits_m4_monthly", | |
}, | |
"Pretrained N-HiTS M4 Yearly": { | |
"card": "nhitsy", | |
"max_steps": 0, | |
"model": "nhits_m4_yearly", | |
}, | |
"Pretrained N-BEATS M4 Hourly": { | |
"card": "nbeatsh", | |
"max_steps": 0, | |
"model": "nbeats_m4_hourly", | |
}, | |
"Pretrained N-BEATS M4 Daily": { | |
"card": "nbeatsd", | |
"max_steps": 0, | |
"model": "nbeats_m4_daily", | |
}, | |
"Pretrained N-BEATS M4 Weekly": { | |
"card": "nbeatsw", | |
"max_steps": 0, | |
"model": "nbeats_m4_weekly", | |
}, | |
"Pretrained N-BEATS M4 Monthly": { | |
"card": "nbeatsm", | |
"max_steps": 0, | |
"model": "nbeats_m4_monthly", | |
}, | |
"Pretrained N-BEATS M4 Yearly": { | |
"card": "nbeatsy", | |
"max_steps": 0, | |
"model": "nbeats_m4_yearly", | |
}, | |
} | |
def download_models(): | |
for _, meta in MODELS.items(): | |
if not Path(f'./models/{meta["model"]}.ckpt').is_file(): | |
download_file( | |
"./models/", | |
f'https://nixtla-public.s3.amazonaws.com/transfer/pretrained_models/{meta["model"]}.ckpt', | |
) | |
download_models() | |
class StandardScaler: | |
"""This class helps to standardize a dataframe with multiple time series.""" | |
def __init__(self): | |
self.norm: pd.DataFrame | |
def fit(self, X: pd.DataFrame) -> "StandardScaler": | |
self.norm = X.groupby("unique_id").agg({"y": [np.mean, np.std]}) | |
self.norm = self.norm.droplevel(0, 1).reset_index() | |
def transform(self, X: pd.DataFrame) -> pd.DataFrame: | |
transformed = X.merge(self.norm, how="left", on=["unique_id"]) | |
transformed["y"] = (transformed["y"] - transformed["mean"]) / transformed["std"] | |
return transformed[["unique_id", "ds", "y"]] | |
def inverse_transform(self, X: pd.DataFrame, cols: List[str]) -> pd.DataFrame: | |
transformed = X.merge(self.norm, how="left", on=["unique_id"]) | |
for col in cols: | |
transformed[col] = ( | |
transformed[col] * transformed["std"] + transformed["mean"] | |
) | |
return transformed[["unique_id", "ds"] + cols] | |
def compute_ds_future(Y_df, fh): | |
if Y_df["unique_id"].nunique() == 1: | |
ds_ = pd.to_datetime(Y_df["ds"].values) | |
try: | |
freq = pd.infer_freq(ds_) | |
except: | |
freq = None | |
if freq is not None: | |
ds_future = pd.date_range(ds_[-1], periods=fh + 1, freq=freq)[1:] | |
else: | |
freq = ds_[-1] - ds_[-2] | |
ds_future = [ds_[-1] + (i + 1) * freq for i in range(fh)] | |
ds_future = list(map(str, ds_future)) | |
return ds_future | |
else: | |
ds_future = chain( | |
*[compute_ds_future(df, fh) for _, df in Y_df.groupby("unique_id")] | |
) | |
return list(ds_future) | |
def forecast_pretrained_model( | |
Y_df: pd.DataFrame, model: str, fh: int, max_steps: int = 0 | |
): | |
if "unique_id" not in Y_df: | |
Y_df.insert(0, "unique_id", "ts_1") | |
scaler = StandardScaler() | |
scaler.fit(Y_df) | |
Y_df = scaler.transform(Y_df) | |
# Model | |
file_ = f"./models/{model}.ckpt" | |
mqnhits = MQNHITS.load_from_checkpoint(file_) | |
# Fit | |
if max_steps > 0: | |
train_dataset = WindowsDataset( | |
Y_df=Y_df, | |
X_df=None, | |
S_df=None, | |
mask_df=None, | |
f_cols=[], | |
input_size=mqnhits.n_time_in, | |
output_size=mqnhits.n_time_out, | |
sample_freq=1, | |
complete_windows=True, | |
verbose=False, | |
) | |
train_loader = TimeSeriesLoader( | |
dataset=train_dataset, batch_size=1, n_windows=32, shuffle=True | |
) | |
trainer = pl.Trainer( | |
max_epochs=None, | |
checkpoint_callback=False, | |
logger=False, | |
max_steps=max_steps, | |
gradient_clip_val=1.0, | |
progress_bar_refresh_rate=1, | |
log_every_n_steps=1, | |
) | |
trainer.fit(mqnhits, train_loader) | |
# Forecast | |
forecast_df = mqnhits.forecast(Y_df=Y_df) | |
forecast_df = scaler.inverse_transform(forecast_df, cols=["y_5", "y_50", "y_95"]) | |
# Foreoast | |
n_ts = forecast_df["unique_id"].nunique() | |
if fh * n_ts > len(forecast_df): | |
forecast_df = ( | |
forecast_df.groupby("unique_id") | |
.apply(lambda df: pd.concat([df] * fh).head(fh)) | |
.reset_index(drop=True) | |
) | |
else: | |
forecast_df = forecast_df.groupby("unique_id").head(fh) | |
forecast_df["ds"] = compute_ds_future(Y_df, fh) | |
return forecast_df | |
if __name__ == "__main__": | |
df = pd.read_csv( | |
"https://raw.githubusercontent.com/Nixtla/transfer-learning-time-series/main/datasets/ercot_COAST.csv" | |
) | |
df.columns = ["ds", "y"] | |
multi_df = pd.concat([df.assign(unique_id=f"ts{i}") for i in range(2)]) | |
assert len(compute_ds_future(multi_df, 80)) == 2 * 80 | |
for _, meta in MODELS.items(): | |
# test just a time series (without unique_id) | |
forecast = forecast_pretrained_model(df, model=meta["model"], fh=80) | |
assert forecast.shape == (80, 5) | |
# test multiple time series | |
multi_forecast = forecast_pretrained_model(multi_df, model=meta["model"], fh=80) | |
assert multi_forecast.shape == (80 * 2, 5) | |