license: gpl-3.0
pipeline_tag: graph-ml
tags:
- code
import contextlib import os from matplotlib import pyplot as plt import numpy as np import torch import torch.nn as nn import torch.optim as optim import requests from torchvision import datasets, transforms import psutil import time import subprocess import onnxruntime as ort import matplotlib.pyplot as plt import numpy as np import numexpr as ne
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
tokenizer = AutoTokenizer.from_pretrained("janpase97/codeformer-pretrained")
model = AutoModelForSeq2SeqLM.from_pretrained("janpase97/codeformer-pretrained")
def check_graphics_api(target_app_name): graphics_api = None
with contextlib.suppress(subprocess.CalledProcessError):
output = subprocess.check_output(['tasklist', '/FI', f'imagename eq {target_app_name}', '/M']).decode('utf-8')
if "opengl32.dll" in output:
graphics_api = "OpenGL"
elif "d3d11.dll" in output:
graphics_api = "DirectX11"
elif "d3d12.dll" in output:
graphics_api = "DirectX12"
elif "vulkan" in output:
graphics_api = "VULKAN"
return graphics_api
Get the target application's process object
def get_target_app_process(target_app_name): return next( ( process for process in psutil.process_iter(['name']) if process.info['name'] == target_app_name ), None, )
Attach the AI to the application's process by PID
def attach_ai_to_app_pid(target_app_process): if target_app_process is not None: print(f"AI is attached to the application's process with PID: {target_app_process.pid}") return True else: print("Could not find the target application's process to attach the AI.") return False
Check if the targeted application is running
def is_target_app_running(target_app_name): return any( process.info['name'] == target_app_name for process in psutil.process_iter(['name']) )
Create the directory if it doesn't exist
directory = r"G:\Epic Games\GTAV\GTA5_AI\trained_models" if not os.path.exists(directory): os.makedirs(directory)
Define the neural network model
class NanoCircuit(nn.Module): def init(self): super(NanoCircuit, self).init() self.fc1 = nn.Linear(784, 128) self.fc2 = nn.Linear(128, 10)
def forward(self, x):
x = x.view(-1, 784) # Reshape the input from (batch_size, 28, 28) to (batch_size, 784)
x = torch.relu(self.fc1(x))
x = self.fc2(x)
return x
Set the device to GPU if available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
Load the MNIST dataset
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]) train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform) train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True)
Initialize the model and move it to the GPU
model = NanoCircuit().to(device) criterion = nn.CrossEntropyLoss() optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9)
Train the model on the GPU with a data cap
def train_with_data_cap(model, data_loader, criterion, optimizer, device, data_cap_gb): data_processed = 0 data_cap_bytes = data_cap_gb * (1024 ** 3) epoch = 0
while data_processed < data_cap_bytes:
running_loss = 0.0
for i, data in enumerate(data_loader, 0):
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
# Update the amount of data processed
data_processed += inputs.nelement() * inputs.element_size()
if data_processed >= data_cap_bytes:
break
optimizer.zero_grad()
outputs = model(inputs.view(-1, 28 * 28))
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
epoch += 1
print(f"Epoch {epoch}, Loss: {running_loss / (i + 1)}")
print(f"Data processed: {data_processed / (1024 ** 3):.2f} GB")
return model
Save the updated model as a .onnx file
def save_model(model, filepath): dummy_input = torch.randn(1, 1, 28, 28).to(device) torch.onnx.export(model, dummy_input, filepath, input_names=['input'], output_names=['output'], opset_version=11)
Train the model with a 1 GB data cap
trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, device, data_cap_gb=50) save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx'))
target_app_name = "GTA5_TRAINED.exe" save_interval_seconds = 5 * 60 application_was_running = False while True: if is_target_app_running(target_app_name): print("Target application is running. Training and updating the model...") trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, device, data_cap_gb=.1) save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) application_was_running = True elif application_was_running: print("Target application has exited. Saving the model...") save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) print("Finished training and saved the model.") break else: print("Target application is not running. Waiting to start training and updating the model...")
time.sleep(save_interval_seconds)
def train_with_data_cap(model, data_loader, criterion, optimizer, device, data_cap_gb): data_processed = 0 data_cap_bytes = data_cap_gb * (1024 ** 3) epoch = 0
while data_processed < data_cap_bytes:
running_loss = 0.0
for i, data in enumerate(data_loader, 0):
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
# Update the amount of data processed
data_processed += inputs.nelement() * inputs.element_size()
if data_processed >= data_cap_bytes:
break
optimizer.zero_grad()
# Compute the outputs and loss using numexpr
outputs = model(inputs.view(-1, 28 * 28))
outputs = outputs.cpu().detach().numpy()
labels = labels.cpu().detach().numpy()
loss = ne.evaluate("sum(-log(outputs[arange(outputs.shape[0]), labels]))") / len(labels)
# Backpropagate and update the model parameters
ne.evaluate("loss", out=loss)
grad_outputs = np.ones_like(outputs)
grad_outputs[np.arange(grad_outputs.shape[0]), labels] = -1
grad_outputs /= len(labels)
grad_outputs = ne.evaluate("grad_outputs * loss_grad")
grad_outputs = torch.from_numpy(grad_outputs).to(device)
outputs = torch.from_numpy(outputs).to(device)
loss.backward(grad_outputs)
optimizer.step()
running_loss += loss.item()
epoch += 1
print(f"Epoch {epoch}, Loss: {running_loss / (i + 1)}")
print(f"Data processed: {data_processed / (1024 ** 3):.2f} GB")
return model
Train the model with a 10 GB data cap
trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, os.device_encoding, data_cap_gb=10) save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx'))
target_app_name = "GTA5.exe" save_interval_seconds = 5 * 60 application_was_running = False while True: if is_target_app_running(target_app_name): print("Target application is running. Training and updating the model...") trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, os.device_encoding, data_cap_gb=10) save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) application_was_running = True elif application_was_running: print("Target application has exited. Saving the model...") save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) print("Finished training and saved the model.") break else: print("Target application is not running. Waiting to start training and updating the model...")
time.sleep(save_interval_seconds)
def train_with_data_cap(model, data_loader, criterion, optimizer, device, data_cap_gb): data_processed = 0 data_cap_bytes = data_cap_gb * (1024 ** 3) epoch = 0
while data_processed < data_cap_bytes:
running_loss = 0.0
for i, data in enumerate(data_loader, 0):
inputs, labels = data
inputs, labels = inputs.to(device), labels.to(device)
# Update the amount of data processed
data_processed += inputs.nelement() * inputs.element_size()
if data_processed >= data_cap_bytes:
break
optimizer.zero_grad()
# Compute the outputs and loss using numexpr
outputs = model(inputs.view(-1, 28 * 28))
outputs = outputs.cpu().detach().numpy()
labels = labels.cpu().detach().numpy()
loss = ne.evaluate("sum(-log(outputs[arange(outputs.shape[0]), labels]))") / len(labels)
# Backpropagate and update the model parameters
ne.evaluate("loss", out=loss)
grad_outputs = np.ones_like(outputs)
grad_outputs[np.arange(grad_outputs.shape[0]), labels] = -1
grad_outputs /= len(labels)
grad_outputs = ne.evaluate("grad_outputs * loss_grad")
grad_outputs = torch.from_numpy(grad_outputs).to(device)
outputs = torch.from_numpy(outputs).to(device)
loss.backward(grad_outputs)
optimizer.step()
running_loss += loss.item()
epoch += 1
print(f"Epoch {epoch}, Loss: {running_loss / (i + 1)}")
print(f"Data processed: {data_processed / (1024 ** 3):.2f} GB")
return model
target_app_name = "GTA5.exe" save_interval_seconds = 1 * 60 application_was_running = False
while True: if is_target_app_running(target_app_name): print("Target application is running. Training and updating the model...") trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, device, data_cap_gb=10) save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) application_was_running = True elif application_was_running: print("Target application has exited. Saving the model...") save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) print("Finished training and saved the model.") break else: start_time = time.time() print("Target application is not running. Waiting to detect the graphics API...") while (time.time() - start_time) < 5: if is_target_app_running(target_app_name): if graphics_api := check_graphics_api(target_app_name): print(f"Detected {graphics_api} in the target application.") break else: print("Could not detect the graphics API used in the target application.") time.sleep(1)
if not is_target_app_running(target_app_name):
print("Target application not detected in 5 seconds. Shutting down the AI.")
break
while True: if is_target_app_running(target_app_name): if graphics_api := check_graphics_api(target_app_name): print(f"Detected {graphics_api} in the target application.") else: print("Could not detect the graphics API used in the target application.") else: start_time = time.time() print("Target application is not running. Waiting to start training and updating the model...") while (time.time() - start_time) < 5: if is_target_app_running(target_app_name): print(f"Detected {graphics_api} in the target application.") break time.sleep(1)
if not is_target_app_running(target_app_name):
print("Target application not detected in 5 seconds. Shutting down the AI.")
break
#Generate some random data for the boxplots np.random.seed(0) original_data = np.random.normal(0, 1, 100) trained_data = np.random.normal(0.5, 1, 100)
while True: if is_target_app_running(target_app_name): print("Target application is running. Training and updating the model...") trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, device, data_cap_gb=10) save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx'))
# Create a box plot of the original and trained data
plt.figure()
plt.boxplot([original_data, trained_data], labels=["Original Data", "Trained Data"])
plt.title("Boxplot of Original and Trained Data")
plt.ylabel("Values")
plt.show()
# Save the box plot as an image
plt.savefig(r"G:\Epic Games\GTAV\GTA5_AI\Plot Box Comparison\boxplot_comparison.png")
application_was_running = True
elif application_was_running:
print("Target application has exited. Saving the model...")
save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx'))
print("Finished training and saved the model.")
break
else:
start_time = time.time()
print("Target application is not running. Waiting to detect the graphics API...")
while (time.time() - start_time) < 5:
if is_target_app_running(target_app_name):
if graphics_api := check_graphics_api(target_app_name):
print(f"Detected {graphics_api} in the target application.")
break
else:
print("Could not detect the graphics API used in the target application.")
time.sleep(1)
if not is_target_app_running(target_app_name):
print("Target application not detected in 5 seconds. Shutting down the AI.")
break