File size: 6,558 Bytes
9b9e880 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 |
import data_source.crypto_compare as cc
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from keras.models import Sequential, load_model
from keras.layers import Dense
from keras.layers import LSTM
from keras.layers import Dropout
import logging
import os
# Initial ltsm code building off of
def import_tensorflow():
# Filter tensorflow version warnings
# https://stackoverflow.com/questions/40426502/is-there-a-way-to-suppress-the-messages-tensorflow-prints/40426709
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' # or any {'0', '1', '2'}
os.environ['CUDA_VISIBLE_DEVICES'] = '-1'
import warnings
# https://stackoverflow.com/questions/15777951/how-to-suppress-pandas-future-warning
warnings.simplefilter(action='ignore', category=FutureWarning)
warnings.simplefilter(action='ignore', category=Warning)
import tensorflow as tf
tf.get_logger().setLevel('INFO')
tf.autograph.set_verbosity(0)
tf.get_logger().setLevel(logging.ERROR)
return tf
tf = import_tensorflow()
class BtcLtsm:
def __init__(self):
self._data_source = cc.CryptoCompare()
self._train_name_base = 'btc_price_train'
self._test_name_base = 'btc_price_test'
self._model_name_base = 'btc_ltsm'
self._history = 60
self._layer_size = 50
self._dropout = 0.2
def update_dataset(self, percent_train=0.98, limit=2000):
try:
ohlcv_df = self._data_source.get_daily_history('BTC', 'USDT', limit=limit)
test_start_idx = int(len(ohlcv_df) * percent_train)
train_df = ohlcv_df[:test_start_idx]
test_df = ohlcv_df[test_start_idx:]
train_df.to_csv(os.path.join('datasets', f'{self._train_name_base}.csv'), index=False)
test_df.to_csv(os.path.join('datasets', f'{self._test_name_base}.csv'), index=False)
return True
except Exception as e:
# Catch all exceptions and print the error message
print(f"An error occurred: {e}")
return False
def train(self):
train_file_name = os.path.join('datasets', f'{self._train_name_base}.csv')
data_train = pd.read_csv(train_file_name)
train_set = data_train.iloc[:, 1:2].values
sc = MinMaxScaler(feature_range=(0, 1))
train_set = sc.fit_transform(train_set)
logging.debug(f'training set:\n{train_set}')
# Creating a data structure with 60 timesteps and 1 output
history = 60
features_train = []
results_train = []
for i in range(history, len(train_set)):
features_train.append(train_set[i-history:i, 0])
results_train.append(train_set[i, 0])
features_train, results_train = np.array(features_train), np.array(results_train)
# Reshaping
features_train = np.reshape(features_train, (features_train.shape[0], features_train.shape[1], 1))
model_path = os.path.join('predictors/saved', f'{self._model_name_base}.h5')
self._create_rnn(model_path, features_train, results_train)
def load(self):
model_path = os.path.join('predictors/saved', f'{self._model_name_base}.h5')
self._regressor = load_model(model_path)
def test_model(self):
train_file_name = os.path.join('datasets', f'{self._train_name_base}.csv')
test_file_name = os.path.join('datasets', f'{self._test_name_base}.csv')
# Getting the real stock price of 2017
dataset_test = pd.read_csv(test_file_name)
real_stock_price = dataset_test.iloc[:, 1:2].values
dataset_train = pd.read_csv(train_file_name)
train_set = dataset_train.iloc[:, 1:2].values
sc = MinMaxScaler(feature_range = (0, 1))
train_set = sc.fit_transform(train_set)
# Getting the predicted stock price of 2017
dataset_total = pd.concat((dataset_train['open'], dataset_test['open']), axis = 0)
inputs = dataset_total[len(dataset_total) - len(dataset_test) - 60:].values
inputs = inputs.reshape(-1,1)
inputs = sc.transform(inputs)
features_test = []
for i in range(self._history, len(inputs)):
features_test.append(inputs[i-self._history:i, 0])
features_test = np.array(features_test)
features_test = np.reshape(features_test, (features_test.shape[0], features_test.shape[1], 1))
predicted_stock_price = self._regressor.predict(features_test)
predicted_stock_price = sc.inverse_transform(predicted_stock_price)
# Visualising the results
plt.plot(real_stock_price, color = 'red', label = 'Real Price')
plt.plot(predicted_stock_price, color = 'blue', label = 'Predicted Price')
plt.title('BTC Price Prediction')
plt.xlabel('Time')
plt.ylabel('BTC Price')
plt.legend()
plt.savefig('btc_price_prediction.png')
def _create_rnn(self, model_name, features_train, results_train, epochs=100, batch_size=32):
# Initialising the RNN
self._regressor = Sequential()
# Adding the first LSTM layer and some Dropout regularisation
self._regressor.add(LSTM(units = self._layer_size, return_sequences = True, input_shape = (features_train.shape[1], 1)))
self._regressor.add(Dropout(self._dropout))
# Adding a second LSTM layer and some Dropout regularisation
self._regressor.add(LSTM(units = self._layer_size, return_sequences = True))
self._regressor.add(Dropout(self._dropout))
# Adding a third LSTM layer and some Dropout regularisation
self._regressor.add(LSTM(units = self._layer_size, return_sequences = True))
self._regressor.add(Dropout(self._dropout))
# Adding a fourth LSTM layer and some Dropout regularisation
self._regressor.add(LSTM(units = self._layer_size))
self._regressor.add(Dropout(self._dropout))
# Adding the output layer
self._regressor.add(Dense(units = 1))
# Compiling the RNN
self._regressor.compile(optimizer = 'adam', loss = 'mean_squared_error')
# Fitting the RNN to the Training set
self._regressor.fit(features_train, results_train, epochs = epochs, batch_size = batch_size)
self._regressor.save(model_ethereum_mixtral) |