|
--- |
|
license: gpl-3.0 |
|
pipeline_tag: graph-ml |
|
tags: |
|
- code |
|
--- |
|
--- |
|
|
|
# Initialize Model and Tokenizer |
|
```python |
|
import contextlib |
|
import os |
|
from matplotlib import pyplot as plt |
|
import numpy as np |
|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
import requests |
|
from torchvision import datasets, transforms |
|
import psutil |
|
import time |
|
import subprocess |
|
import onnxruntime as ort |
|
import matplotlib.pyplot as plt |
|
import numpy as np |
|
import numexpr as ne |
|
|
|
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM |
|
|
|
tokenizer = AutoTokenizer.from_pretrained("janpase97/codeformer-pretrained") |
|
|
|
model = AutoModelForSeq2SeqLM.from_pretrained("janpase97/codeformer-pretrained") |
|
``` |
|
|
|
# Check for the graphics API |
|
|
|
```python |
|
def check_graphics_api(target_app_name): |
|
graphics_api = None |
|
|
|
with contextlib.suppress(subprocess.CalledProcessError): |
|
output = subprocess.check_output(['tasklist', '/FI', f'imagename eq {target_app_name}', '/M']).decode('utf-8') |
|
if "opengl32.dll" in output: |
|
graphics_api = "OpenGL" |
|
elif "d3d11.dll" in output: |
|
graphics_api = "DirectX11" |
|
elif "d3d12.dll" in output: |
|
graphics_api = "DirectX12" |
|
elif "vulkan" in output: |
|
graphics_api = "VULKAN" |
|
return graphics_api |
|
``` |
|
|
|
# Get the target application's process object |
|
|
|
```python |
|
def get_target_app_process(target_app_name): |
|
return next( |
|
( |
|
process |
|
for process in psutil.process_iter(['name']) |
|
if process.info['name'] == target_app_name |
|
), |
|
None, |
|
) |
|
``` |
|
|
|
# Attach the AI to the application's process by PID |
|
|
|
```python |
|
def attach_ai_to_app_pid(target_app_process): |
|
if target_app_process is not None: |
|
print(f"AI is attached to the application's process with PID: {target_app_process.pid}") |
|
return True |
|
else: |
|
print("Could not find the target application's process to attach the AI.") |
|
return False |
|
``` |
|
|
|
# Check if the targeted application is running |
|
|
|
```python |
|
def is_target_app_running(target_app_name): |
|
return any( |
|
process.info['name'] == target_app_name |
|
for process in psutil.process_iter(['name']) |
|
) |
|
``` |
|
|
|
# Create the directory if it doesn't exist |
|
|
|
```python |
|
directory = r"G:\Epic Games\GTAV\GTA5_AI\trained_models" |
|
if not os.path.exists(directory): |
|
os.makedirs(directory) |
|
``` |
|
|
|
# Define the neural network model |
|
|
|
```python |
|
class NanoCircuit(nn.Module): |
|
def __init__(self): |
|
super(NanoCircuit, self).__init__() |
|
self.fc1 = nn.Linear(784, 128) |
|
self.fc2 = nn.Linear(128, 10) |
|
|
|
def forward(self, x): |
|
x = x.view(-1, 784) # Reshape the input from (batch_size, 28, 28) to (batch_size, 784) |
|
x = torch.relu(self.fc1(x)) |
|
x = self.fc2(x) |
|
return x |
|
``` |
|
|
|
# Set the device to GPU if available |
|
|
|
```python |
|
device = torch.device("cuda:0" if torch.cuda.is_available() else "CPU") |
|
``` |
|
|
|
# Load the MNIST dataset |
|
|
|
```python |
|
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]) |
|
train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform) |
|
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True) |
|
``` |
|
|
|
# Initialize the model and move it to the GPU |
|
|
|
```python |
|
model = NanoCircuit().to(device) |
|
criterion = nn.CrossEntropyLoss() |
|
optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9) |
|
``` |
|
|
|
# Train the model on the GPU with a data cap |
|
|
|
```python |
|
def train_with_data_cap(model, data_loader, criterion, optimizer, device, data_cap_gb): |
|
data_processed = 0 |
|
data_cap_bytes = data_cap_gb * (1024 ** 3) |
|
epoch = 0 |
|
|
|
while data_processed < data_cap_bytes: |
|
running_loss = 0.0 |
|
for i, data in enumerate(data_loader, 0): |
|
inputs, labels = data |
|
inputs, labels = inputs.to(device), labels.to(device) |
|
|
|
# Update the amount of data processed |
|
data_processed += inputs.nelement() * inputs.element_size() |
|
if data_processed >= data_cap_bytes: |
|
break |
|
|
|
optimizer.zero_grad() |
|
|
|
outputs = model(inputs.view(-1, 28 * 28)) |
|
loss = criterion(outputs, labels) |
|
loss.backward() |
|
optimizer.step() |
|
|
|
running_loss += loss.item() |
|
|
|
epoch += 1 |
|
print(f"Epoch {epoch}, Loss: {running_loss / (i + 1)}") |
|
print(f"Data processed: {data_processed / (1024 ** 3):.2f} GB") |
|
|
|
return model |
|
``` |
|
|
|
# Save the updated model as a .onnx file |
|
|
|
```python |
|
def save_model(model, filepath): |
|
dummy_input = torch.randn(1, 1, 28, 28).to(device) |
|
torch.onnx.export(model, dummy_input, filepath, input_names=['input'], output_names=['output'], opset_version=11) |
|
``` |
|
|
|
# Train the model with a 1 GB data cap |
|
|
|
```python |
|
trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, device, data_cap_gb=50) |
|
save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) |
|
|
|
target_app_name = "GTA5_TRAINED.exe" |
|
save_interval_seconds = 5 * 60 |
|
application_was_running = False |
|
while True: |
|
if is_target_app_running(target_app_name): |
|
print("Target application is running. Training and updating the model...") |
|
trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, device, data_cap_gb=.1) |
|
save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) |
|
application_was_running = True |
|
elif application_was_running: |
|
print("Target application has exited. Saving the model...") |
|
save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) |
|
print("Finished training and saved the model.") |
|
break |
|
else: |
|
print("Target application is not running. Waiting to start training and updating the model...") |
|
|
|
time.sleep(save_interval_seconds) |
|
|
|
def train_with_data_cap(model, data_loader, criterion, optimizer, device, data_cap_gb): |
|
data_processed = 0 |
|
data_cap_bytes = data_cap_gb * (1024 ** 3) |
|
epoch = 0 |
|
|
|
while data_processed < data_cap_bytes: |
|
running_loss = 0.0 |
|
for i, data in enumerate(data_loader, 0): |
|
inputs, labels = data |
|
inputs, labels = inputs.to(device), labels.to(device) |
|
|
|
# Update the amount of data processed |
|
data_processed += inputs.nelement() * inputs.element_size() |
|
if data_processed >= data_cap_bytes: |
|
break |
|
|
|
optimizer.zero_grad() |
|
|
|
# Compute the outputs and loss using numexpr |
|
outputs = model(inputs.view(-1, 28 * 28)) |
|
outputs = outputs.cpu().detach().numpy() |
|
labels = labels.cpu().detach().numpy() |
|
loss = ne.evaluate("sum(-log(outputs[arange(outputs.shape[0]), labels]))") / len(labels) |
|
|
|
# Backpropagate and update the model parameters |
|
ne.evaluate("loss", out=loss) |
|
grad_outputs = np.ones_like(outputs) |
|
grad_outputs[np.arange(grad_outputs.shape[0]), labels] = -1 |
|
grad_outputs /= len(labels) |
|
grad_outputs = ne.evaluate("grad_outputs * loss_grad") |
|
grad_outputs = torch.from_numpy(grad_outputs).to(device) |
|
outputs = torch.from_numpy(outputs).to(device) |
|
loss.backward(grad_outputs) |
|
optimizer.step() |
|
|
|
running_loss += loss.item() |
|
|
|
epoch += 1 |
|
print(f"Epoch {epoch}, Loss: {running_loss / (i + 1)}") |
|
print(f"Data processed: {data_processed / (1024 ** 3):.2f} GB") |
|
|
|
return model |
|
``` |
|
|
|
# Train the model with a 10 GB data cap |
|
|
|
```python |
|
trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, os.device_encoding, data_cap_gb=10) |
|
save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) |
|
|
|
target_app_name = "GTA5.exe" |
|
save_interval_seconds = 5 * 60 |
|
application_was_running = False |
|
while True: |
|
if is_target_app_running(target_app_name): |
|
print("Target application is running. Training and updating the model...") |
|
trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, os.device_encoding, data_cap_gb=10) |
|
save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) |
|
application_was_running = True |
|
elif application_was_running: |
|
print("Target application has exited. Saving the model...") |
|
save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) |
|
print("Finished training and saved the model.") |
|
break |
|
else: |
|
print("Target application is not running. Waiting to start training and updating the model...") |
|
|
|
time.sleep(save_interval_seconds) |
|
|
|
def train_with_data_cap(model, data_loader, criterion, optimizer, device, data_cap_gb): |
|
data_processed = 0 |
|
data_cap_bytes = data_cap_gb * (1024 ** 3) |
|
epoch = 0 |
|
|
|
while data_processed < data_cap_bytes: |
|
running_loss = 0.0 |
|
for i, data in enumerate(data_loader, 0): |
|
inputs, labels = data |
|
inputs, labels = inputs.to(device), labels.to(device) |
|
|
|
# Update the amount of data processed |
|
data_processed += inputs.nelement() * inputs.element_size() |
|
if data_processed >= data_cap_bytes: |
|
break |
|
|
|
optimizer.zero_grad() |
|
|
|
# Compute the outputs and loss using numexpr |
|
outputs = model(inputs.view(-1, 28 * 28)) |
|
outputs = outputs.cpu().detach().numpy() |
|
labels = labels.cpu().detach().numpy() |
|
loss = ne.evaluate("sum(-log(outputs[arange(outputs.shape[0]), labels]))") / len(labels) |
|
|
|
# Backpropagate and update the model parameters |
|
ne.evaluate("loss", out=loss) |
|
grad_outputs = np.ones_like(outputs) |
|
grad_outputs[np.arange(grad_outputs.shape[0]), labels] = -1 |
|
grad_outputs /= len(labels) |
|
grad_outputs = ne.evaluate("grad_outputs * loss_grad") |
|
grad_outputs = torch.from_numpy(grad_outputs).to(device) |
|
outputs = torch.from_numpy(outputs).to(device) |
|
loss.backward(grad_outputs) |
|
optimizer.step() |
|
|
|
running_loss += loss.item() |
|
|
|
epoch += 1 |
|
print(f"Epoch {epoch}, Loss: {running_loss / (i + 1)}") |
|
print(f"Data processed: {data_processed / (1024 ** 3):.2f} GB") |
|
|
|
return model |
|
|
|
target_app_name = "GTA5.exe" |
|
save_interval_seconds = 1 * 60 |
|
application_was_running = False |
|
|
|
while True: |
|
if is_target_app_running(target_app_name): |
|
print("Target application is running. Training and updating the model...") |
|
trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, device, data_cap_gb=10) |
|
save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) |
|
application_was_running = True |
|
elif application_was_running: |
|
print("Target application has exited. Saving the model...") |
|
save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) |
|
print("Finished training and saved the model.") |
|
break |
|
else: |
|
start_time = time.time() |
|
print("Target application is not running. Waiting to detect the graphics API...") |
|
while (time.time() - start_time) < 5: |
|
if is_target_app_running(target_app_name): |
|
if graphics_api := check_graphics_api(target_app_name): |
|
print(f"Detected {graphics_api} in the target application.") |
|
break |
|
else: |
|
print("Could not detect the graphics API used in the target application.") |
|
time.sleep(1) |
|
|
|
if not is_target_app_running(target_app_name): |
|
print("Target application not detected in 5 seconds. Shutting down the AI.") |
|
break |
|
|
|
|
|
while True: |
|
if is_target_app_running(target_app_name): |
|
if graphics_api := check_graphics_api(target_app_name): |
|
print(f"Detected {graphics_api} in the target application.") |
|
else: |
|
print("Could not detect the graphics API used in the target application.") |
|
else: |
|
start_time = time.time() |
|
print("Target application is not running. Waiting to start training and updating the model...") |
|
while (time.time() - start_time) < 5: |
|
if is_target_app_running(target_app_name): |
|
print(f"Detected {graphics_api} in the target application.") |
|
break |
|
time.sleep(1) |
|
|
|
if not is_target_app_running(target_app_name): |
|
print("Target application not detected in 5 seconds. Shutting down the AI.") |
|
break |
|
``` |
|
|
|
# Generate some random data for the boxplots |
|
|
|
```python |
|
np.random.seed(0) |
|
original_data = np.random.normal(0, 1, 100) |
|
trained_data = np.random.normal(0.5, 1, 100) |
|
|
|
while True: |
|
if is_target_app_running(target_app_name): |
|
print("Target application is running. Training and updating the model...") |
|
trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, device, data_cap_gb=10) |
|
save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) |
|
|
|
# Create a box plot of the original and trained data |
|
plt.figure() |
|
plt.boxplot([original_data, trained_data], labels=["Original Data", "Trained Data"]) |
|
plt.title("Boxplot of Original and Trained Data") |
|
plt.ylabel("Values") |
|
plt.show() |
|
|
|
# Save the box plot as an image |
|
plt.savefig(r"G:\Epic Games\GTAV\GTA5_AI\Plot Box Comparison\boxplot_comparison.png") |
|
|
|
application_was_running = True |
|
elif application_was_running: |
|
print("Target application has exited. Saving the model...") |
|
save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) |
|
print("Finished training and saved the model.") |
|
break |
|
else: |
|
start_time = time.time() |
|
print("Target application is not running. Waiting to detect the graphics API...") |
|
while (time.time() - start_time) < 5: |
|
if is_target_app_running(target_app_name): |
|
if graphics_api := check_graphics_api(target_app_name): |
|
print(f"Detected {graphics_api} in the target application.") |
|
break |
|
else: |
|
print("Could not detect the graphics API used in the target application.") |
|
time.sleep(1) |
|
|
|
if not is_target_app_running(target_app_name): |
|
print("Target application not detected in 5 seconds. Shutting down the AI.") |
|
break |
|
``` |