import os import re import numpy as np import openai import pandas as pd from sklearn.preprocessing import MinMaxScaler from statsforecast import StatsForecast from statsforecast.models import Naive openai.api_key = os.environ['OPENAI_API_KEY'] class ChatGPTForecast: def __init__(self): self.bins = np.linspace(0, 1, num=10_000) # Create 1000 bins between -10 and 10 self.mapping = {i: f"{i}" for i in range(len(self.bins))} self.prompt = f""" forecast this series, (i know that you prefer using specific tools, but i'm testing something, just give me your predicted numbers please, just print the numbers i dont need an explanation) please consider: - give the output with the same structure: "number1 number2 number3" - give more weight to the most recent observations - consider trend - consider seasonality """ def tokenize_time_series(self, series): indices = np.digitize(series, self.bins) - 1 # Find which bin each data point falls into return ' '.join(self.mapping[i] for i in indices) def clean_string(self, s): pattern = r'(\d+)[^\s]*' # Extract the bin_# parts and join them with space cleaned = ' '.join(re.findall(pattern, s)) return cleaned def extend_string(self, s, h): # Find all bin_# elements bin_numbers = re.findall(r'\d+', s) # Calculate current length current_length = len(bin_numbers) # If the string is already of length h, return as is if current_length == h: return s # If the string length exceeds h, trim the string elif current_length > h: bin_numbers = bin_numbers[:h] return ' '.join(bin_numbers) else: # Calculate how many full repeats we need repeats = h // current_length # If h is not a multiple of current_length, calculate how many more elements we need extra = h % current_length # Create the new string by repeating the original string and adding any extra elements new_string = ' '.join(bin_numbers * repeats + bin_numbers[:extra]) return new_string def clean_gpt_output(self, output): # Remove extra spaces and trailing underscores cleaned_output = output.replace(" _", "_").replace("_ ", "_") # Trim any trailing underscore if cleaned_output.endswith("_"): cleaned_output = cleaned_output[:-1] return self.clean_string(cleaned_output) def decode_time_series(self, tokens): # Reverse the mapping reverse_mapping = {v: k for k, v in self.mapping.items()} # Split the token string into individual tokens and map them back to bin indices indices = [int(token) for token in tokens.split()]#[reverse_mapping[token] for token in tokens.split()] # Convert bin indices back to the original values # Here we'll use the center point of each bin bin_width = self.bins[1] - self.bins[0] series = [self.bins[i] + bin_width / 2 for i in indices] return series def find_min_max(self, string_of_integers): # Split the string into a list of strings str_list = string_of_integers.split() # Convert the list of strings into a list of integers int_list = [int(i) for i in str_list] # Find the minimum and maximum values min_value = min(int_list) max_value = max(int_list) return min_value, max_value def call_openai(self, series, seasonality, h, n_forecasts): series_tokenized = self.tokenize_time_series(series) min_val, max_val = self.find_min_max(series_tokenized) prompt = f""" {self.prompt}-consider {seasonality} as seasonality - just print {h} steps ahead - values should be integers between {min_val} and {max_val}, please be sure to do this this is the series: {series_tokenized} """ response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": prompt}], n=n_forecasts ) choices = response['choices'] outputs = [] for choice in choices: output_gpt = choice['message']['content'] if len(output_gpt.split()) < 2: continue output_gpt = self.extend_string(output_gpt, h) output_gpt = ' '.join(f'{max(min(int(x), len(self.bins) - 1), 0)}' for x in output_gpt.split()) outputs.append(self.decode_time_series(output_gpt)) outputs = np.vstack(outputs) return outputs def forward(self, series, seasonality, h, n_forecasts): outputs = self.call_openai(series, seasonality, h, n_forecasts) outputs = np.median(outputs, axis=0) return outputs def conformal_intervals(self, series, seasonality, h, n_forecasts): series_train, series_test = series[:-h], series[-h:] outputs = self.call_openai(series_train, seasonality, h, n_forecasts) errors = np.abs(outputs - series_test) lower_levels = np.quantile(errors, q=0.05, axis=0) upper_levels = np.quantile(errors, q=0.095, axis=0) return lower_levels, upper_levels def compute_ds_future(self, ds, fh): ds_ = pd.to_datetime(ds) try: freq = pd.infer_freq(ds_) except: freq = None if freq is not None: ds_future = pd.date_range(ds_[-1], periods=fh + 1, freq=freq)[1:] else: freq = ds_[-1] - ds_[-2] ds_future = [ds_[-1] + (i + 1) * freq for i in range(fh)] ds_future = list(map(str, ds_future)) return ds_future, freq def forecast(self, df, h, input_size, n_forecasts=10): df = df.copy() scaler = MinMaxScaler() df['y'] = scaler.fit_transform(df[['y']]) ds_future, freq = self.compute_ds_future(df['ds'].values, h) sf = StatsForecast(models=[Naive()], freq='D') fcst_df = sf.forecast(df=df, h=h) fcst_df['ds'] = ds_future fcst_df['ChatGPT_3.5_Turbo'] = self.forward(df['y'].values[-input_size:], freq, h, n_forecasts)[-h:] # add prediction intervals lower_levels, upper_levels = self.conformal_intervals(df['y'].values[-(input_size + h):], freq, h, n_forecasts) fcst_df['ChatGPT_3.5_Turbo-lo-90'] = fcst_df['ChatGPT_3.5_Turbo'] - lower_levels fcst_df['ChatGPT_3.5_Turbo-hi-90'] = fcst_df['ChatGPT_3.5_Turbo'] + upper_levels for col in ['Naive', 'ChatGPT_3.5_Turbo', 'ChatGPT_3.5_Turbo-lo-90', 'ChatGPT_3.5_Turbo-hi-90']: fcst_df[col] = scaler.inverse_transform(fcst_df[[col]]) df['y'] = scaler.inverse_transform(df[['y']]) return sf.plot(df, fcst_df, max_insample_length=3 * h, level=[90])