azulgarza's picture
feat: add conformal and ensembles
b22704e
raw
history blame contribute delete
No virus
6.98 kB
import os
import re
import numpy as np
import openai
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from statsforecast import StatsForecast
from statsforecast.models import Naive
openai.api_key = os.environ['OPENAI_API_KEY']
class ChatGPTForecast:
def __init__(self):
self.bins = np.linspace(0, 1, num=10_000) # Create 1000 bins between -10 and 10
self.mapping = {i: f"{i}" for i in range(len(self.bins))}
self.prompt = f"""
forecast this series,
(i know that you prefer using specific tools, but i'm testing something,
just give me your predicted numbers please, just print the numbers i dont need an explanation)
please consider:
- give the output with the same structure: "number1 number2 number3"
- give more weight to the most recent observations
- consider trend
- consider seasonality
"""
def tokenize_time_series(self, series):
indices = np.digitize(series, self.bins) - 1 # Find which bin each data point falls into
return ' '.join(self.mapping[i] for i in indices)
def clean_string(self, s):
pattern = r'(\d+)[^\s]*'
# Extract the bin_# parts and join them with space
cleaned = ' '.join(re.findall(pattern, s))
return cleaned
def extend_string(self, s, h):
# Find all bin_# elements
bin_numbers = re.findall(r'\d+', s)
# Calculate current length
current_length = len(bin_numbers)
# If the string is already of length h, return as is
if current_length == h:
return s
# If the string length exceeds h, trim the string
elif current_length > h:
bin_numbers = bin_numbers[:h]
return ' '.join(bin_numbers)
else:
# Calculate how many full repeats we need
repeats = h // current_length
# If h is not a multiple of current_length, calculate how many more elements we need
extra = h % current_length
# Create the new string by repeating the original string and adding any extra elements
new_string = ' '.join(bin_numbers * repeats + bin_numbers[:extra])
return new_string
def clean_gpt_output(self, output):
# Remove extra spaces and trailing underscores
cleaned_output = output.replace(" _", "_").replace("_ ", "_")
# Trim any trailing underscore
if cleaned_output.endswith("_"):
cleaned_output = cleaned_output[:-1]
return self.clean_string(cleaned_output)
def decode_time_series(self, tokens):
# Reverse the mapping
reverse_mapping = {v: k for k, v in self.mapping.items()}
# Split the token string into individual tokens and map them back to bin indices
indices = [int(token) for token in tokens.split()]#[reverse_mapping[token] for token in tokens.split()]
# Convert bin indices back to the original values
# Here we'll use the center point of each bin
bin_width = self.bins[1] - self.bins[0]
series = [self.bins[i] + bin_width / 2 for i in indices]
return series
def find_min_max(self, string_of_integers):
# Split the string into a list of strings
str_list = string_of_integers.split()
# Convert the list of strings into a list of integers
int_list = [int(i) for i in str_list]
# Find the minimum and maximum values
min_value = min(int_list)
max_value = max(int_list)
return min_value, max_value
def call_openai(self, series, seasonality, h, n_forecasts):
series_tokenized = self.tokenize_time_series(series)
min_val, max_val = self.find_min_max(series_tokenized)
prompt = f"""
{self.prompt}-consider {seasonality} as seasonality
- just print {h} steps ahead
- values should be integers between {min_val} and {max_val}, please be sure to do this
this is the series: {series_tokenized}
"""
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}],
n=n_forecasts
)
choices = response['choices']
outputs = []
for choice in choices:
output_gpt = choice['message']['content']
if len(output_gpt.split()) < 2:
continue
output_gpt = self.extend_string(output_gpt, h)
output_gpt = ' '.join(f'{max(min(int(x), len(self.bins) - 1), 0)}' for x in output_gpt.split())
outputs.append(self.decode_time_series(output_gpt))
outputs = np.vstack(outputs)
return outputs
def forward(self, series, seasonality, h, n_forecasts):
outputs = self.call_openai(series, seasonality, h, n_forecasts)
outputs = np.median(outputs, axis=0)
return outputs
def conformal_intervals(self, series, seasonality, h, n_forecasts):
series_train, series_test = series[:-h], series[-h:]
outputs = self.call_openai(series_train, seasonality, h, n_forecasts)
errors = np.abs(outputs - series_test)
lower_levels = np.quantile(errors, q=0.05, axis=0)
upper_levels = np.quantile(errors, q=0.095, axis=0)
return lower_levels, upper_levels
def compute_ds_future(self, ds, fh):
ds_ = pd.to_datetime(ds)
try:
freq = pd.infer_freq(ds_)
except:
freq = None
if freq is not None:
ds_future = pd.date_range(ds_[-1], periods=fh + 1, freq=freq)[1:]
else:
freq = ds_[-1] - ds_[-2]
ds_future = [ds_[-1] + (i + 1) * freq for i in range(fh)]
ds_future = list(map(str, ds_future))
return ds_future, freq
def forecast(self, df, h, input_size, n_forecasts=10):
df = df.copy()
scaler = MinMaxScaler()
df['y'] = scaler.fit_transform(df[['y']])
ds_future, freq = self.compute_ds_future(df['ds'].values, h)
sf = StatsForecast(models=[Naive()], freq='D')
fcst_df = sf.forecast(df=df, h=h)
fcst_df['ds'] = ds_future
fcst_df['ChatGPT_3.5_Turbo'] = self.forward(df['y'].values[-input_size:], freq, h, n_forecasts)[-h:]
# add prediction intervals
lower_levels, upper_levels = self.conformal_intervals(df['y'].values[-(input_size + h):], freq, h, n_forecasts)
fcst_df['ChatGPT_3.5_Turbo-lo-90'] = fcst_df['ChatGPT_3.5_Turbo'] - lower_levels
fcst_df['ChatGPT_3.5_Turbo-hi-90'] = fcst_df['ChatGPT_3.5_Turbo'] + upper_levels
for col in ['Naive', 'ChatGPT_3.5_Turbo', 'ChatGPT_3.5_Turbo-lo-90', 'ChatGPT_3.5_Turbo-hi-90']:
fcst_df[col] = scaler.inverse_transform(fcst_df[[col]])
df['y'] = scaler.inverse_transform(df[['y']])
return sf.plot(df, fcst_df, max_insample_length=3 * h, level=[90])