metadata
license: gpl-3.0
pipeline_tag: graph-ml
tags:
- code
Initialize Model and Tokenizer
import contextlib
import os
from matplotlib import pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import requests
from torchvision import datasets, transforms
import psutil
import time
import subprocess
import onnxruntime as ort
import matplotlib.pyplot as plt
import numpy as np
import numexpr as ne
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
tokenizer = AutoTokenizer.from_pretrained("janpase97/codeformer-pretrained")
model = AutoModelForSeq2SeqLM.from_pretrained("janpase97/codeformer-pretrained")
Check for the graphics API
def check_graphics_api(target_app_name):
graphics_api = None
with contextlib.suppress(subprocess.CalledProcessError):
output = subprocess.check_output(['tasklist', '/FI', f'imagename eq {target_app_name}', '/M']).decode('utf-8')
if "opengl32.dll" in output:
graphics_api = "OpenGL"
elif "d3d11.dll" in output:
graphics_api = "DirectX11"
elif "d3d12.dll" in output:
graphics_api = "DirectX12"
elif "vulkan" in output:
graphics_api = "VULKAN"
return graphics_api
Get the target application's process object
def get_target_app_process(target_app_name):
return next(
(
process
for process in psutil.process_iter(['name'])
if process.info['name'] == target_app_name
),
None,
)
Attach the AI to the application's process by PID
def attach_ai_to_app_pid(target_app_process):
if target_app_process is not None:
print(f"AI is attached to the application's process with PID: {target_app_process.pid}")
return True
else:
print("Could not find the target application's process to attach the AI.")
return False
Check if the targeted application is running
def is_target_app_running(target_app_name):
return any(
process.info['name'] == target_app_name
for process in psutil.process_iter(['name'])
)
Create the directory if it doesn't exist
directory = r"G:\Epic Games\GTAV\GTA5_AI\trained_models"
if not os.path.exists(directory):
os.makedirs(directory)
Define the neural network model
class NanoCircuit(nn.Module):
def __init__(self):
super(NanoCircuit, self).__init__()
self.fc1 = nn.Linear(784, 128)
self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = x.view(-1, 784) # Reshape the input from (batch_size, 28, 28) to (batch_size, 784)
x = torch.relu(self.fc1(x))
x = self.fc2(x)
return x
Set the device to GPU if available
device = torch.device("cuda:0" if torch.cuda.is_available() else "CPU")
Load the MNIST dataset
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
Initialize the model and move it to the GPU
model = NanoCircuit().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
Train the model on the GPU with a data cap
def train_with_data_cap(model, data_loader, criterion, optimizer, device, data_cap_gb):
data_processed = 0
data_cap_bytes = data_cap_gb * (1024 ** 3)
epoch = 0
while data_processed < data_cap_bytes:
running_loss = 0.0
for i, data in enumerate(data_loader, 0):
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
# Update the amount of data processed
data_processed += inputs.nelement() * inputs.element_size()
if data_processed >= data_cap_bytes:
break
optimizer.zero_grad()
outputs = model(inputs.view(-1, 28 * 28))
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
epoch += 1
print(f"Epoch {epoch}, Loss: {running_loss / (i + 1)}")
print(f"Data processed: {data_processed / (1024 ** 3):.2f} GB")
return model
Save the updated model as a .onnx file
def save_model(model, filepath):
dummy_input = torch.randn(1, 1, 28, 28).to(device)
torch.onnx.export(model, dummy_input, filepath, input_names=['input'], output_names=['output'], opset_version=11)
Train the model with a 1 GB data cap
trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, device, data_cap_gb=50)
save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx'))
target_app_name = "GTA5_TRAINED.exe"
save_interval_seconds = 5 * 60
application_was_running = False
while True:
if is_target_app_running(target_app_name):
print("Target application is running. Training and updating the model...")
trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, device, data_cap_gb=.1)
save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx'))
application_was_running = True
elif application_was_running:
print("Target application has exited. Saving the model...")
save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx'))
print("Finished training and saved the model.")
break
else:
print("Target application is not running. Waiting to start training and updating the model...")
time.sleep(save_interval_seconds)
def train_with_data_cap(model, data_loader, criterion, optimizer, device, data_cap_gb):
data_processed = 0
data_cap_bytes = data_cap_gb * (1024 ** 3)
epoch = 0
while data_processed < data_cap_bytes:
running_loss = 0.0
for i, data in enumerate(data_loader, 0):
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
# Update the amount of data processed
data_processed += inputs.nelement() * inputs.element_size()
if data_processed >= data_cap_bytes:
break
optimizer.zero_grad()
# Compute the outputs and loss using numexpr
outputs = model(inputs.view(-1, 28 * 28))
outputs = outputs.cpu().detach().numpy()
labels = labels.cpu().detach().numpy()
loss = ne.evaluate("sum(-log(outputs[arange(outputs.shape[0]), labels]))") / len(labels)
# Backpropagate and update the model parameters
ne.evaluate("loss", out=loss)
grad_outputs = np.ones_like(outputs)
grad_outputs[np.arange(grad_outputs.shape[0]), labels] = -1
grad_outputs /= len(labels)
grad_outputs = ne.evaluate("grad_outputs * loss_grad")
grad_outputs = torch.from_numpy(grad_outputs).to(device)
outputs = torch.from_numpy(outputs).to(device)
loss.backward(grad_outputs)
optimizer.step()
running_loss += loss.item()
epoch += 1
print(f"Epoch {epoch}, Loss: {running_loss / (i + 1)}")
print(f"Data processed: {data_processed / (1024 ** 3):.2f} GB")
return model
Train the model with a 10 GB data cap
trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, os.device_encoding, data_cap_gb=10)
save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx'))
target_app_name = "GTA5.exe"
save_interval_seconds = 5 * 60
application_was_running = False
while True:
if is_target_app_running(target_app_name):
print("Target application is running. Training and updating the model...")
trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, os.device_encoding, data_cap_gb=10)
save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx'))
application_was_running = True
elif application_was_running:
print("Target application has exited. Saving the model...")
save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx'))
print("Finished training and saved the model.")
break
else:
print("Target application is not running. Waiting to start training and updating the model...")
time.sleep(save_interval_seconds)
def train_with_data_cap(model, data_loader, criterion, optimizer, device, data_cap_gb):
data_processed = 0
data_cap_bytes = data_cap_gb * (1024 ** 3)
epoch = 0
while data_processed < data_cap_bytes:
running_loss = 0.0
for i, data in enumerate(data_loader, 0):
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
# Update the amount of data processed
data_processed += inputs.nelement() * inputs.element_size()
if data_processed >= data_cap_bytes:
break
optimizer.zero_grad()
# Compute the outputs and loss using numexpr
outputs = model(inputs.view(-1, 28 * 28))
outputs = outputs.cpu().detach().numpy()
labels = labels.cpu().detach().numpy()
loss = ne.evaluate("sum(-log(outputs[arange(outputs.shape[0]), labels]))") / len(labels)
# Backpropagate and update the model parameters
ne.evaluate("loss", out=loss)
grad_outputs = np.ones_like(outputs)
grad_outputs[np.arange(grad_outputs.shape[0]), labels] = -1
grad_outputs /= len(labels)
grad_outputs = ne.evaluate("grad_outputs * loss_grad")
grad_outputs = torch.from_numpy(grad_outputs).to(device)
outputs = torch.from_numpy(outputs).to(device)
loss.backward(grad_outputs)
optimizer.step()
running_loss += loss.item()
epoch += 1
print(f"Epoch {epoch}, Loss: {running_loss / (i + 1)}")
print(f"Data processed: {data_processed / (1024 ** 3):.2f} GB")
return model
target_app_name = "GTA5.exe"
save_interval_seconds = 1 * 60
application_was_running = False
while True:
if is_target_app_running(target_app_name):
print("Target application is running. Training and updating the model...")
trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, device, data_cap_gb=10)
save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx'))
application_was_running = True
elif application_was_running:
print("Target application has exited. Saving the model...")
save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx'))
print("Finished training and saved the model.")
break
else:
start_time = time.time()
print("Target application is not running. Waiting to detect the graphics API...")
while (time.time() - start_time) < 5:
if is_target_app_running(target_app_name):
if graphics_api := check_graphics_api(target_app_name):
print(f"Detected {graphics_api} in the target application.")
break
else:
print("Could not detect the graphics API used in the target application.")
time.sleep(1)
if not is_target_app_running(target_app_name):
print("Target application not detected in 5 seconds. Shutting down the AI.")
break
while True:
if is_target_app_running(target_app_name):
if graphics_api := check_graphics_api(target_app_name):
print(f"Detected {graphics_api} in the target application.")
else:
print("Could not detect the graphics API used in the target application.")
else:
start_time = time.time()
print("Target application is not running. Waiting to start training and updating the model...")
while (time.time() - start_time) < 5:
if is_target_app_running(target_app_name):
print(f"Detected {graphics_api} in the target application.")
break
time.sleep(1)
if not is_target_app_running(target_app_name):
print("Target application not detected in 5 seconds. Shutting down the AI.")
break
Generate some random data for the boxplots
np.random.seed(0)
original_data = np.random.normal(0, 1, 100)
trained_data = np.random.normal(0.5, 1, 100)
while True:
if is_target_app_running(target_app_name):
print("Target application is running. Training and updating the model...")
trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, device, data_cap_gb=10)
save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx'))
# Create a box plot of the original and trained data
plt.figure()
plt.boxplot([original_data, trained_data], labels=["Original Data", "Trained Data"])
plt.title("Boxplot of Original and Trained Data")
plt.ylabel("Values")
plt.show()
# Save the box plot as an image
plt.savefig(r"G:\Epic Games\GTAV\GTA5_AI\Plot Box Comparison\boxplot_comparison.png")
application_was_running = True
elif application_was_running:
print("Target application has exited. Saving the model...")
save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx'))
print("Finished training and saved the model.")
break
else:
start_time = time.time()
print("Target application is not running. Waiting to detect the graphics API...")
while (time.time() - start_time) < 5:
if is_target_app_running(target_app_name):
if graphics_api := check_graphics_api(target_app_name):
print(f"Detected {graphics_api} in the target application.")
break
else:
print("Could not detect the graphics API used in the target application.")
time.sleep(1)
if not is_target_app_running(target_app_name):
print("Target application not detected in 5 seconds. Shutting down the AI.")
break